svn merge -r2910:2939
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2940 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/pom.xml b/genomix/genomix-core/pom.xml
index b2c7a4c..eb2a27d 100644
--- a/genomix/genomix-core/pom.xml
+++ b/genomix/genomix-core/pom.xml
@@ -235,12 +235,12 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
+ <artifactId>hyracks-hdfs-core</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
+ <artifactId>hyracks-hdfs-core</artifactId>
<version>0.2.3-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/genomix/genomix-core/src/main/resources/conf/cluster.properties b/genomix/genomix-core/src/main/resources/conf/cluster.properties
index 77abcf5..eabd81b 100644
--- a/genomix/genomix-core/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-core/src/main/resources/conf/cluster.properties
@@ -36,5 +36,5 @@
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
#NC JAVA_OPTS
-NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=logging.properties"
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
private final ISerializableTable table;
private final int tableSize;
private final TuplePointer storedTuplePointer;
+ private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
throws HyracksDataException {
+ this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+ }
+
+ public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+ ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
} else {
nullTupleBuild = null;
}
+ reverseOutputOrder = reverse;
}
public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
matchFound = true;
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- throw new IllegalStateException();
- }
- }
+ appendToResult(i, tIndex, writer);
}
} while (true);
if (!matchFound && isLeftOuter) {
- if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
throw new IllegalStateException();
}
}
+
}
}
}
@@ -145,23 +148,25 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
-
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 8;
-
- long[] pointers;
- int size;
-
- Link() {
- pointers = new long[INIT_POINTERS_SIZE];
- size = 0;
- }
-
- void add(long pointer) {
- if (size >= pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
+
+ private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+ if(!reverseOutputOrder){
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ throw new IllegalStateException();
+ }
}
- pointers[size++] = pointer;
- }
+ }
+ else{
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 19479df..cf39416 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
+
}
@@ -254,6 +256,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
@@ -421,34 +424,40 @@
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
int tabSize = -1;
- if (buildPartSize < probePartSize) {
+
+ if (isLeftOuter || buildPartSize < probePartSize) {
tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
- //Build Side is smaller
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
- buildSideReader, probeSideReader);
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid);
- } else { //Role Reversal
+ }
+
+ else { //Role Reversal
tabSize = ohhj.getProbePartitionSizeInTup(pid);
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Probe Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
- probeSideReader, buildSideReader);
+
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
OptimizedHybridHashJoin rHHj;
- if (buildPartSize < probePartSize) { //Build Side is smaller
+ if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
@@ -491,14 +500,14 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
- if (buildSideInTups < probeSideInTups) {
+ if (isLeftOuter || buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
nljComparator0);
} else {
@@ -510,6 +519,7 @@
} else { //Role Reversal (Probe Side is smaller)
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
@@ -548,7 +558,7 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
@@ -572,14 +582,14 @@
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
- ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+ ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
- isLeftOuter, nullWriters1, table);
+ isLeftOuter, nullWriters1, table, reverse);
bReader.open();
rPartbuff.clear();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
new file mode 100644
index 0000000..e74d610
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-hdfs-0.20.2</artifactId>
+ <name>hyracks-hdfs-0.20.2</name>
+ <parent>
+ <artifactId>hyracks-hdfs</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <id>hadoop-0.20.2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <id>hadoop-1.0.4</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>1.0.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>1.0.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>1.0.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..a2b16c6
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ try {
+ return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
new file mode 100644
index 0000000..9133d35
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MiniDFSClusterFactory {
+
+ public MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+ try {
+ return new MiniDFSCluster(conf, numberOfNC, true, null);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
new file mode 100644
index 0000000..27a1e33
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <name>hyracks-hdfs-0.23.1</name>
+ <parent>
+ <artifactId>hyracks-hdfs</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <id>hadoop-0.23.1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>0.23.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>0.23.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>0.23.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>0.23.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-0.23.6</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>0.23.6</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>0.23.6</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>0.23.6</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>0.23.6</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..60ae5d3
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+ public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ try {
+ return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
new file mode 100644
index 0000000..ded75f1
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/MiniDFSClusterFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MiniDFSClusterFactory {
+
+ public MiniDFSCluster getMiniDFSCluster(Configuration conf, int numberOfNC) throws HyracksDataException {
+ try {
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ builder.numDataNodes(numberOfNC);
+ MiniDFSCluster dfsCluster = builder.build();
+ return dfsCluster;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
new file mode 100644
index 0000000..8b99cad
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <name>hyracks-hdfs-core</name>
+ <parent>
+ <artifactId>hyracks-hdfs</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ <include>edu.uci.*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <id>hadoop-0.20.2</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.20.2</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <id>hadoop-1.0.4</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.20.2</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <id>hadoop-0.23.1</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <id>hadoop-0.23.6</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.3.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryComparatorFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/RawBinaryHashFunctionFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
similarity index 86%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index b000c75..90f5603 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
@@ -37,12 +36,13 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
/**
- * The HDFS file read operator using the Hadoop new API.
- * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * The HDFS file read operator using the Hadoop new API. To use this operator, a
+ * user need to provide an IKeyValueParserFactory implementation which convert
* key-value pairs into tuples.
*/
@SuppressWarnings("rawtypes")
@@ -63,12 +63,16 @@
* @param rd
* the output record descriptor
* @param conf
- * the Hadoop JobConf object, which contains the input format and the input paths
+ * the Hadoop JobConf object, which contains the input format and
+ * the input paths
* @param splits
* the array of FileSplits (HDFS chunks).
* @param scheduledLocations
- * the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
- * is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+ * the node controller names to scan the FileSplits, which is an
+ * one-to-one mapping. The String array is obtained from the
+ * edu.cui
+ * .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
+ * (InputSplits[]).
* @param tupleParserFactory
* the ITupleParserFactory implementation instance.
* @throws HyracksException
@@ -101,6 +105,7 @@
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ private ContextFactory ctxFactory = new ContextFactory();
@SuppressWarnings("unchecked")
@Override
@@ -108,11 +113,11 @@
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- Job conf = confFactory.getConf();
+ Job job = confFactory.getConf();
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
writer.open();
- InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
- conf.getConfiguration());
+ InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
int size = inputSplits.size();
for (int i = 0; i < size; i++) {
/**
@@ -120,8 +125,8 @@
*/
if (scheduledLocations[i].equals(nodeName)) {
/**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
*/
synchronized (executed) {
if (executed[i] == false) {
@@ -134,8 +139,8 @@
/**
* read the split
*/
- TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
- new TaskAttemptID());
+ TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
+ inputSplits.get(i));
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
similarity index 95%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 87c1c47..390a7b5 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -22,8 +22,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -87,8 +85,7 @@
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Job conf = confFactory.getConf();
- String outputPath = FileOutputFormat
- .getOutputPath(new JobContext(conf.getConfiguration(), new JobID())).toString();
+ String outputPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputPath + File.separator + "part-" + partition;
tupleWriter = tupleWriterFactory.getTupleWriter();
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
similarity index 96%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 508ba07..9f77979 100644
--- a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -32,8 +32,6 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -53,6 +51,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.MiniDFSClusterFactory;
import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
@@ -78,6 +77,7 @@
private static final String HYRACKS_APP_NAME = "DataflowTest";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
+ private MiniDFSClusterFactory dfsClusterFactory = new MiniDFSClusterFactory();
private Job conf;
private int numberOfNC = 2;
@@ -113,7 +113,7 @@
FileSystem lfs = FileSystem.getLocal(new Configuration());
lfs.delete(new Path("build"), true);
System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf.getConfiguration(), numberOfNC, true, null);
+ dfsCluster = dfsClusterFactory.getMiniDFSCluster(conf.getConfiguration(), numberOfNC);
FileSystem dfs = FileSystem.get(conf.getConfiguration());
Path src = new Path(DATA_PATH);
Path dest = new Path(HDFS_INPUT_PATH);
@@ -141,7 +141,7 @@
Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(), conf.getConfiguration());
- List<InputSplit> splits = inputFormat.getSplits(new JobContext(conf.getConfiguration(), new JobID()));
+ List<InputSplit> splits = inputFormat.getSplits(conf);
String[] readSchedule = scheduler.getLocationConstraints(splits);
JobSpecification jobSpec = new JobSpecification();
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
diff --git a/hyracks/hyracks-hdfs/src/test/resources/data/customer.tbl b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/data/customer.tbl
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/data/customer.tbl
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/data/customer.tbl
diff --git a/hyracks/hyracks-hdfs/src/test/resources/expected/part-0 b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/expected/part-0
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/expected/part-0
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/expected/part-0
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/core-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/core-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/core-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/core-site.xml
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/hdfs-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/hdfs-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/hdfs-site.xml
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/log4j.properties b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/log4j.properties
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/log4j.properties
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
similarity index 100%
rename from hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
rename to hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/hadoop/conf/mapred-site.xml
diff --git a/hyracks/hyracks-hdfs/pom.xml b/hyracks/hyracks-hdfs/pom.xml
index 5b1e429..5ed76e9 100644
--- a/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks/hyracks-hdfs/pom.xml
@@ -1,130 +1,19 @@
-<?xml version="1.0"?>
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>hyracks</artifactId>
- <groupId>edu.uci.ics.hyracks</groupId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-hdfs</artifactId>
+ <packaging>pom</packaging>
+ <name>hyracks-hdfs</name>
- <artifactId>hyracks-hdfs</artifactId>
- <name>hyracks-hdfs</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7.2</version>
- <configuration>
- <forkMode>pertest</forkMode>
- <includes>
- <include>**/*TestSuite.java</include>
- <include>**/*Test.java</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
- <configuration>
- <filesets>
- <fileset>
- <directory>.</directory>
- <includes>
- <include>edu*</include>
- <include>actual*</include>
- <include>build*</include>
- <include>expect*</include>
- <include>ClusterController*</include>
- <include>edu.uci.*</include>
- </includes>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-control-cc</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-io</artifactId>
- <version>1.3.2</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <modules>
+ <module>hyracks-hdfs-0.20.2</module>
+ <module>hyracks-hdfs-0.23.1</module>
+ <module>hyracks-hdfs-core</module>
+ </modules>
</project>
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index eb5f716..8fa99be 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -58,7 +58,7 @@
ChannelControlBlock allocateChannel() throws NetException {
synchronized (mConn) {
- //cleanupClosedChannels();
+ cleanupClosedChannels();
int idx = allocationBitmap.nextClearBit(0);
if (idx < 0 || idx >= ccbArray.length) {
cleanupClosedChannels();
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 2caa93b..0643804 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -77,11 +77,11 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
/** List of incoming messages from the previous superstep */
private final List<M> msgList = new ArrayList<M>();
/** map context */
- private static Mapper.Context context = null;
+ private static TaskAttemptContext context = null;
/** a delegate for hyracks stuff */
private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
/** this vertex is updated or not */
@@ -444,8 +444,10 @@
/**
* Add a new vertex into the graph
*
- * @param vertexId the vertex id
- * @param vertex the vertex
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
*/
public final void addVertex(I vertexId, V vertex) {
delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
/**
* Delete a vertex from id
*
- * @param vertexId the vertex id
+ * @param vertexId
+ * the vertex id
*/
public final void deleteVertex(I vertexId) {
delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
/**
* Pregelix internal use only
*/
- public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ public static final TaskAttemptContext getContext() {
return context;
}
@@ -537,7 +540,7 @@
*
* @param context
*/
- public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ public static final void setContext(TaskAttemptContext context) {
Vertex.context = context;
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 34faa82..0908829 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -244,11 +244,6 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -281,13 +276,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>com.kenai.nbpwr</groupId>
<artifactId>org-apache-commons-io</artifactId>
<version>1.3.1-201002241208</version>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 6c039bf..3f3ba00 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow-std</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow-std</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -101,11 +102,13 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -144,12 +147,5 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
private FrameDeserializer frameDeserializer;
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final IFunction function = functionFactory.createFunction();
+ private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
@Override
- public void close() throws HyracksDataException {
- if (postHookFactory != null)
- postHookFactory.createRuntimeHook().configure(ctx);
- function.close();
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
- writer.close();
+ writer.open();
}
- }
-
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, rd0, writers);
}
@Override
@@ -89,17 +88,21 @@
}
@Override
- public void open() throws HyracksDataException {
- rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
- frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
for (IFrameWriter writer : writers) {
- writer.open();
+ writer.close();
}
- if (preHookFactory != null)
- preHookFactory.createRuntimeHook().configure(ctx);
- function.open(ctx, rd0, writers);
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
private final IFrameWriter[] writers;
private TupleDeserializer tupleDe;
private RecordDescriptor inputRd;
+ private ClassLoader ctxCL;
public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
public void functionOpen() throws HyracksDataException {
inputRd = inputRdFactory.createRecordDescriptor();
tupleDe = new TupleDeserializer(inputRd);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
writer.open();
@@ -124,5 +126,6 @@
for (IFrameWriter writer : writers) {
writer.close();
}
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 31b6adc..107f059 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -103,13 +104,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
@@ -144,12 +138,5 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d76 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf;
private VertexWriter vertexWriter;
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
: inputRdFactory.createRecordDescriptor();
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
@Override
@@ -151,6 +154,8 @@
dfs.rename(srcFile, filePath);
} catch (IOException e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -25,8 +25,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -43,6 +41,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -83,12 +82,15 @@
final List<FileSplit> splits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Configuration conf = confFactory.createConfiguration();
writer.open();
for (int i = 0; i < scheduledLocations.length; i++) {
if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
continue;
}
}
- loadVertices(ctx, i);
+ loadVertices(ctx, conf, i);
}
}
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -122,8 +126,9 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+ throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+ IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
@@ -141,8 +146,7 @@
/**
* set context
*/
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(splitId));
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 1c414ff..82be6b9 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -117,13 +117,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
package edu.uci.ics.pregelix.runtime.touchpoint;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
public IRuntimeHook createRuntimeHook() {
return new IRuntimeHook() {
+ private ContextFactory ctxFactory = new ContextFactory();
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {