fix file write race condition
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
index 0ce2346..57c9133 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
@@ -25,7 +25,7 @@
* @param deploymentId
* @return
*/
- public IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId);
+ public IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId);
/**
* Add a deployment with the job serializer deserializer
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
index 35a1e8b..f2d8fff 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -15,18 +15,18 @@
package edu.uci.ics.hyracks.api.job;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
public class JobSerializerDeserializerContainer implements IJobSerializerDeserializerContainer {
private IJobSerializerDeserializer defaultJobSerDe = new JobSerializerDeserializer();
- private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new HashMap<DeploymentId, IJobSerializerDeserializer>();
+ private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new ConcurrentHashMap<DeploymentId, IJobSerializerDeserializer>();
@Override
- public synchronized IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId) {
+ public synchronized IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId) {
if (deploymentId == null) {
return defaultJobSerDe;
}
@@ -44,4 +44,9 @@
jobSerializerDeserializerMap.remove(deploymentId);
}
+ @Override
+ public String toString() {
+ return jobSerializerDeserializerMap.toString();
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 3a35c25..9be3818 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -31,7 +31,6 @@
* This is the IJobSerializerDeserializer implementation for jobs with dynamic deployed jars.
*
* @author yingyib
- *
*/
public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
@@ -99,4 +98,9 @@
public ClassLoader getClassLoader() throws HyracksException {
return classLoader;
}
+
+ @Override
+ public String toString() {
+ return classLoader.toString();
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0677e2e..0724a01 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -89,7 +89,7 @@
*/
public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
ServerContext ctx, boolean isNC) throws HyracksException {
- IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
+ IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
container.addJobSerializerDeserializer(deploymentId, jobSerDe);
@@ -116,7 +116,7 @@
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
- .getJobSerializerDeerializer(deploymentId);
+ .getJobSerializerDeserializer(deploymentId);
Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
return obj;
} catch (Exception e) {
@@ -138,7 +138,7 @@
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
- .getJobSerializerDeerializer(deploymentId);
+ .getJobSerializerDeserializer(deploymentId);
Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
.loadClass(className);
return cl;
@@ -160,7 +160,7 @@
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
- .getJobSerializerDeerializer(deploymentId);
+ .getJobSerializerDeserializer(deploymentId);
ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
return cl;
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 56b6654..586c1a6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -327,4 +327,8 @@
throw new RuntimeException(e);
}
}
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 26cb8d0..28d424a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -258,8 +258,13 @@
@Override
final public void readFields(DataInput in) throws IOException {
reset();
- if (vertexId == null)
+ if (vertexId == null) {
+ if (getContext().getConfiguration().getClassLoader() != this.getClass().getClassLoader()) {
+ throw new IllegalStateException("mismatched classloader: "
+ + getContext().getConfiguration().getClassLoader() + " and " + this.getClass().getClassLoader());
+ }
vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ }
vertexId.readFields(in);
delegate.setVertexId(vertexId);
boolean hasVertexValue = in.readBoolean();
@@ -576,15 +581,6 @@
return context;
}
- /**
- * Pregelix internal use only
- *
- * @param context
- */
- public static final void setContext(TaskAttemptContext context) {
- Vertex.context = context;
- }
-
@Override
public int hashCode() {
return vertexId.hashCode();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6493127..e79d4d2 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -37,11 +37,6 @@
* them.
*/
public class BspUtils {
- private static Configuration defaultConf = null;
-
- public static void setDefaultConfiguration(Configuration conf) {
- defaultConf = conf;
- }
/**
* Get the user's subclassed {@link VertexInputFormat}.
@@ -209,8 +204,6 @@
*/
@SuppressWarnings("unchecked")
public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
}
@@ -302,8 +295,6 @@
*/
@SuppressWarnings("unchecked")
public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
}
@@ -316,8 +307,6 @@
*/
@SuppressWarnings("unchecked")
public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
}
@@ -330,8 +319,6 @@
*/
@SuppressWarnings("unchecked")
public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
}
@@ -344,8 +331,6 @@
*/
@SuppressWarnings("unchecked")
public static Class<? extends NormalizedKeyComputer> getNormalizedKeyComputerClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<? extends NormalizedKeyComputer>) conf.getClass(PregelixJob.NMK_COMPUTER_CLASS,
NormalizedKeyComputer.class);
}
@@ -359,8 +344,6 @@
*/
@SuppressWarnings("unchecked")
public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
}
@@ -488,8 +471,6 @@
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <V extends VertexPartitioner> Class<V> getVertexPartitionerClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<V>) conf.getClass(PregelixJob.PARTITIONER_CLASS, null, VertexPartitioner.class);
}
@@ -502,8 +483,6 @@
*/
@SuppressWarnings("unchecked")
public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
- if (conf == null)
- conf = defaultConf;
return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 15981e7..8d9eeac 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -254,7 +254,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -283,7 +283,7 @@
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), vertexClass.getName());
VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
resultFileSplitProvider, preHookFactory, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -325,7 +325,7 @@
* construct btree search operator
*/
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -347,7 +347,7 @@
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), vertexClass.getName());
VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
resultFileSplitProvider, preHookFactory, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
@@ -478,7 +478,7 @@
return new VertexPartitionComputerFactory(confFactory);
} else {
return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
- BspUtils.getVertexIndexClass(conf)));
+ BspUtils.getVertexIndexClass(conf)), confFactory);
}
}
@@ -514,7 +514,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
@@ -585,7 +585,7 @@
* construct btree search operator
*/
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -614,10 +614,11 @@
/**
* construct write file operator
*/
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), vertexClass.getName());
VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
- inputRdFactory);
+ inputRdFactory, preHookFactory);
ClusterConfig.setLocationConstraint(spec, writer);
/**
@@ -656,7 +657,7 @@
/**
* source aggregate
*/
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
/**
@@ -680,7 +681,7 @@
tmpJob.setOutputValueClass(MsgList.class);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
ClusterConfig.setLocationConstraint(spec, hdfsWriter);
@@ -720,7 +721,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), MsgList.class.getName());
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 8c61e6a..9fefbea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -125,7 +125,7 @@
/**
* construct btree search and function call update operator
*/
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -134,20 +134,21 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -166,8 +167,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -311,15 +312,15 @@
* source aggregate
*/
int[] keyFields = new int[] { 0 };
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -360,13 +361,14 @@
* construct index-join-function-update operator
*/
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -445,8 +447,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -585,7 +587,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), MsgList.class.getName());
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
@@ -662,7 +664,7 @@
/**
* construct btree search operator
*/
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
@@ -681,7 +683,7 @@
* construct write file operator
*/
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
ClusterConfig.setLocationConstraint(spec, writer);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 8446379..ee576b1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -96,7 +96,7 @@
/**
* construct btree search function update operator
*/
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
@@ -106,18 +106,19 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -149,7 +150,7 @@
/**
* construct global group-by operator
*/
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
conf, true, true);
@@ -183,8 +184,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -267,15 +268,15 @@
* source aggregate
*/
int[] keyFields = new int[] { 0 };
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -309,13 +310,14 @@
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -379,8 +381,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 0259c5c..32271c8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -96,7 +96,7 @@
/**
* construct btree search operator
*/
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -109,18 +109,19 @@
/**
* construct compute operator
*/
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -144,7 +145,7 @@
/**
* construct global group-by operator
*/
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
conf, true, false);
@@ -177,8 +178,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -258,15 +259,15 @@
* source aggregate
*/
int[] keyFields = new int[] { 0 };
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -300,13 +301,14 @@
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -362,8 +364,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 0a38d72..3aa36cd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -96,7 +96,7 @@
/**
* construct btree search function update operator
*/
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
@@ -106,18 +106,19 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
@@ -157,7 +158,7 @@
/**
* construct global group-by operator
*/
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
conf, true, true);
@@ -191,8 +192,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
@@ -272,15 +273,15 @@
* source aggregate
*/
int[] keyFields = new int[] { 0 };
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
- RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
- RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
- RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -314,13 +315,14 @@
nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
- RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdPartialAggregate = DataflowUtils
- .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ VLongWritable.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
+ partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+ conf, vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
@@ -392,8 +394,8 @@
/**
* final aggregate write operator
*/
- IRecordDescriptorFactory aggRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, partialAggregateValueClass.getName());
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index a67c259..68e3ba7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,25 +14,32 @@
*/
package edu.uci.ics.pregelix.core.runtime.touchpoint;
+import org.apache.hadoop.conf.Configuration;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
private static final long serialVersionUID = 1L;
private String[] fieldClasses;
+ private IConfigurationFactory confFactory;
- public WritableRecordDescriptorFactory(String... fieldClasses) {
+ public WritableRecordDescriptorFactory(Configuration conf, String... fieldClasses) {
this.fieldClasses = fieldClasses;
+ this.confFactory = new ConfigurationFactory(conf);
}
@Override
public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
try {
- return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
+ Configuration conf = confFactory.createConfiguration(ctx);
+ return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, conf, fieldClasses);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 3e01109..3a2241b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -38,14 +38,14 @@
}
@SuppressWarnings("unchecked")
- public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
- throws HyracksException {
+ public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(Configuration conf, String className1,
+ String className2) throws HyracksException {
RecordDescriptor recordDescriptor = null;
try {
ClassLoader loader = DataflowUtils.class.getClassLoader();
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) loader.loadClass(className1),
- (Class<? extends Writable>) loader.loadClass(className2));
+ (Class<? extends Writable>) loader.loadClass(className2), conf);
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -53,15 +53,16 @@
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+ public static RecordDescriptor getRecordDescriptorFromWritableClasses(Configuration conf, String... classNames)
+ throws HyracksException {
RecordDescriptor recordDescriptor = null;
ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
ClassLoader loader = DataflowUtils.class.getClassLoader();
try {
int i = 0;
for (String className : classNames)
- serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
- .loadClass(className));
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer(
+ (Class<? extends Writable>) loader.loadClass(className), conf);
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -69,9 +70,9 @@
return recordDescriptor;
}
- public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
- throws HyracksException {
- IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+ public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(Configuration conf,
+ String... classNames) throws HyracksException {
+ IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(conf, classNames);
return rdFactory;
}
@@ -85,13 +86,13 @@
}
@SuppressWarnings("unchecked")
- public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
- String className2) throws HyracksException {
+ public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, Configuration conf,
+ String className1, String className2) throws HyracksException {
RecordDescriptor recordDescriptor = null;
try {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
.getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
- .loadClass(className2));
+ .loadClass(className2), conf);
} catch (Exception cnfe) {
throw new HyracksException(cnfe);
}
@@ -99,15 +100,17 @@
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
- throws HyracksException {
+ public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, Configuration conf,
+ String... classNames) throws HyracksException {
RecordDescriptor recordDescriptor = null;
ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
try {
int i = 0;
- for (String className : classNames)
- serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
- .getJobletContext().loadClass(className));
+ for (String className : classNames) {
+ Class<? extends Writable> c = (Class<? extends Writable>) ctx.getJobletContext().loadClass(className);
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer(c, conf);
+ //System.out.println("thread " + Thread.currentThread().getId() + " after creating serde " + c.getClassLoader());
+ }
} catch (Exception cnfe) {
throw new HyracksException(cnfe);
}
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index 2b435a1..77d75bf 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -90,6 +90,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.10-SNAPSHOT</version>
<type>jar</type>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
index 1fdd0b6..894cba5 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
@@ -16,10 +16,12 @@
import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
public interface ISerializerDeserializerFactory<T> extends Serializable {
- public ISerializerDeserializer<T> getSerializerDeserializer();
+ public ISerializerDeserializer<T> getSerializerDeserializer(Configuration conf);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index e5bdb17..5579a77 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -57,10 +57,10 @@
* @throws HyracksDataException
*/
public void functionOpen() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
inputRd = inputRdFactory.createRecordDescriptor(ctx);
tupleDe = new TupleDeserializer(inputRd);
- ctxCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
writer.open();
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index b34879e..e16ba48 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -16,6 +16,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -122,7 +124,7 @@
@SuppressWarnings("unchecked")
private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
- IllegalAccessException {
+ IllegalAccessException, NoSuchFieldException, InvocationTargetException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
@@ -141,7 +143,11 @@
/**
* set context
*/
- Vertex.setContext(mapperContext);
+ ClassLoader cl = ctx.getJobletContext().getClassLoader();
+ Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+ Field contextField = vClass.getDeclaredField("context");
+ contextField.setAccessible(true);
+ contextField.set(null, mapperContext);
/**
* empty vertex value
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
index f3ec40e..2087ea2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileWriteOperatorDescriptor.java
@@ -44,17 +44,20 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
public class VertexFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final IConfigurationFactory confFactory;
private final IRecordDescriptorFactory inputRdFactory;
+ private final IRuntimeHookFactory preHookFactory;
public VertexFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
- IRecordDescriptorFactory inputRdFactory) {
+ IRecordDescriptorFactory inputRdFactory, IRuntimeHookFactory preHookFactory) {
super(spec, 1, 0);
this.confFactory = confFactory;
this.inputRdFactory = inputRdFactory;
+ this.preHookFactory = preHookFactory;
}
@SuppressWarnings("rawtypes")
@@ -85,6 +88,8 @@
context = ctxFactory.createContext(conf, partition);
context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
try {
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index b1d1043..a39df3c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -201,6 +201,10 @@
}
vertex.finishCompute();
} catch (Exception e) {
+ ClassLoader cl1 = vertex.getClass().getClassLoader();
+ ClassLoader cl2 = msgContentList.get(0).getClass().getClassLoader();
+ System.out.println("cl1 " + cl1);
+ System.out.println("cl2 " + cl2);
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index b121b5b..e99fcb3 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -17,32 +17,30 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
-@SuppressWarnings("deprecation")
public class DatatypeHelper {
private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
private static final long serialVersionUID = 1L;
- private Class<T> clazz;
+ private final Class<T> clazz;
+ private transient Configuration conf;
private T object;
- private WritableSerializerDeserializer(Class<T> clazz) {
+ private WritableSerializerDeserializer(Class<T> clazz, Configuration conf) {
this.clazz = clazz;
+ this.conf = conf;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private T createInstance() throws HyracksDataException {
// TODO remove "if", create a new WritableInstanceOperations class
// that deals with Writables that don't have public constructors
@@ -50,7 +48,11 @@
return (T) NullWritable.get();
}
try {
- return clazz.newInstance();
+ T t = clazz.newInstance();
+ if (t instanceof ArrayListWritable) {
+ ((ArrayListWritable) t).setConf(conf);
+ }
+ return t;
} catch (InstantiationException e) {
throw new HyracksDataException(e);
} catch (IllegalAccessException e) {
@@ -85,42 +87,16 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
- Class<? extends Writable> fClass) {
- return new WritableSerializerDeserializer(fClass);
+ Class<? extends Writable> fClass, Configuration conf) {
+ return new WritableSerializerDeserializer(fClass, conf);
}
public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
- Class<? extends Writable> valueClass) {
+ Class<? extends Writable> valueClass, Configuration conf) {
@SuppressWarnings("rawtypes")
ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
- fields[0] = createSerializerDeserializer(keyClass);
- fields[1] = createSerializerDeserializer(valueClass);
+ fields[0] = createSerializerDeserializer(keyClass, conf);
+ fields[1] = createSerializerDeserializer(valueClass, conf);
return new RecordDescriptor(fields);
}
-
- public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
- fields[0] = createSerializerDeserializer(fieldClass);
- return new RecordDescriptor(fields);
- }
-
- public static JobConf map2JobConf(Map<String, String> jobConfMap) {
- JobConf jobConf;
- synchronized (Configuration.class) {
- jobConf = new JobConf();
- for (Entry<String, String> entry : jobConfMap.entrySet()) {
- jobConf.set(entry.getKey(), entry.getValue());
- }
- }
- return jobConf;
- }
-
- public static Map<String, String> jobConf2Map(JobConf jobConf) {
- Map<String, String> jobConfMap = new HashMap<String, String>();
- for (Entry<String, String> entry : jobConf) {
- jobConfMap.put(entry.getKey(), entry.getValue());
- }
- return jobConfMap;
- }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index 5e8ac1e..3151df2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.runtime.touchpoint;
+import java.lang.reflect.Field;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -21,8 +23,6 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.hdfs.ContextFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -48,8 +48,12 @@
try {
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
- Vertex.setContext(mapperContext);
- BspUtils.setDefaultConfiguration(conf);
+
+ ClassLoader cl = ctx.getJobletContext().getClassLoader();
+ Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+ Field contextField = vClass.getDeclaredField("context");
+ contextField.setAccessible(true);
+ contextField.set(null, mapperContext);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index 6fa6434..c9b67fb 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -16,6 +16,7 @@
import java.io.DataInputStream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,30 +25,39 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
private final ISerializerDeserializerFactory<K> keyIOFactory;
+ private final IConfigurationFactory confFactory;
- public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory) {
+ public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory,
+ IConfigurationFactory confFactory) {
this.keyIOFactory = keyIOFactory;
+ this.confFactory = confFactory;
}
public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
- private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer();
+ try {
+ final Configuration conf = confFactory.createConfiguration();
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+ private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer(conf);
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 0);
- bbis.setByteBuffer(accessor.getBuffer(), keyStart);
- K key = keyIO.deserialize(dis);
- return Math.abs(key.hashCode() % nParts);
- }
- };
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ K key = keyIO.deserialize(dis);
+ return Math.abs(key.hashCode() % nParts);
+ }
+ };
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
index 435d081..c11ac5b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.pregelix.runtime.touchpoint;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -29,7 +30,7 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public ISerializerDeserializer getSerializerDeserializer() {
- return DatatypeHelper.createSerializerDeserializer(clazz);
+ public ISerializerDeserializer getSerializerDeserializer(Configuration conf) {
+ return DatatypeHelper.createSerializerDeserializer(clazz, conf);
}
}