let hivesterix use hyracks-hdfs
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3083 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index d41bdc8..666d361 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -27,6 +27,7 @@
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ConfUtil {
+ private static final String clusterPropertiesPath = "conf/cluster.properties";
private static JobConf job;
private static HiveConf hconf;
@@ -34,8 +35,8 @@
private static Map<String, List<String>> ncMapping;
private static IHyracksClientConnection hcc = null;
private static ClusterTopology topology = null;
- private static final String clusterPropertiesPath = "conf/cluster.properties";
private static Properties clusterProps;
+ private static Map<String, NodeControllerInfo> ncNameToNcInfos;
public static JobConf getJobConf(Class<? extends InputFormat> format, Path path) {
JobConf conf = new JobConf();
@@ -109,6 +110,17 @@
return ncMapping;
}
+ public static Map<String, NodeControllerInfo> getNodeControllerInfo() throws HyracksException {
+ if (ncNameToNcInfos == null) {
+ try {
+ loadClusterConfig();
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+ return ncNameToNcInfos;
+ }
+
private static void loadClusterConfig() {
try {
getHiveConf();
@@ -132,7 +144,7 @@
hcc = new HyracksConnection(ipAddress, port);
topology = hcc.getClusterTopology();
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ ncNameToNcInfos = hcc.getNodeControllerInfos();
NCs = new String[ncNameToNcInfos.size() * mpl];
ncMapping = new HashMap<String, List<String>>();
int i = 0;
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index f3af915..0d2c78a 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -1,16 +1,17 @@
package edu.uci.ics.hivesterix.runtime.jobgen;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hivesterix.common.config.ConfUtil;
-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileScanOperatorDescriptor;
-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileSplitProvider;
-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveTupleParserFactory;
+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveKeyValueParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -19,12 +20,13 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class HiveScanRuntimeGenerator {
@@ -80,15 +82,18 @@
// get record descriptor
RecordDescriptor recDescriptor = mkRecordDescriptor(propagatedSchema, schemaTypes, context);
- // setup the run time operator
+ // setup the run time operator and constraints
JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);
- int clusterSize = ConfUtil.getNCs().length;
- IFileSplitProvider fsprovider = new HiveFileSplitProvider(conf, filePathName, clusterSize);
- ITupleParserFactory tupleParserFactory = new HiveTupleParserFactory(fileDesc, conf, outputColumnsOffset);
- HiveFileScanOperatorDescriptor opDesc = new HiveFileScanOperatorDescriptor(jobSpec, fsprovider,
- tupleParserFactory, recDescriptor);
+ String[] locConstraints = ConfUtil.getNCs();
+ Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);
+ String[] schedule = scheduler.getLocationConstraints(splits);
+ IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,
+ schedule, new HiveKeyValueParserFactory(fileDesc, conf, outputColumnsOffset));
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(opDesc, opDesc.getPartitionConstraint());
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner,
+ new AlgebricksAbsolutePartitionConstraint(locConstraints));
} catch (Exception e) {
throw new AlgebricksException(e);
}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
deleted file mode 100644
index 03f3312..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public abstract class AbstractHiveFileSplitProvider implements IFileSplitProvider {
- private static final long serialVersionUID = 1L;
-
- @Override
- public FileSplit[] getFileSplits() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @SuppressWarnings("deprecation")
- public abstract org.apache.hadoop.mapred.FileSplit[] getFileSplitArray();
-
-}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
deleted file mode 100644
index 485e1d0..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.InputStream;
-
-import org.apache.hadoop.mapred.FileSplit;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-
-@SuppressWarnings("deprecation")
-public abstract class AbstractHiveTupleParser implements ITupleParser {
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- // empty implementation
- }
-
- /**
- * method for parsing HDFS file split
- *
- * @param split
- * @param writer
- */
- abstract public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException;
-
-}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
deleted file mode 100644
index a64d398..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.mapred.FileSplit;
-
-import edu.uci.ics.hivesterix.common.config.ConfUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-@SuppressWarnings("deprecation")
-public class HiveFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- /**
- * tuple parser factory
- */
- private final ITupleParserFactory tupleParserFactory;
-
- /**
- * Hive file split
- */
- private Partition[] parts;
-
- /**
- * IFileSplitProvider
- */
- private IFileSplitProvider fileSplitProvider;
-
- /**
- * constrains in the form of host DNS names
- */
- private String[] constraintsByHostNames;
-
- /**
- * ip-to-node controller mapping
- */
- private Map<String, List<String>> ncMapping;
-
- /**
- * an array of NCs
- */
- private String[] NCs;
-
- /**
- * @param spec
- * @param fsProvider
- */
- public HiveFileScanOperatorDescriptor(JobSpecification spec, IFileSplitProvider fsProvider,
- ITupleParserFactory tupleParserFactory, RecordDescriptor rDesc) {
- super(spec, 0, 1);
- this.tupleParserFactory = tupleParserFactory;
- recordDescriptors[0] = rDesc;
- fileSplitProvider = fsProvider;
- }
-
- /**
- * set partition constraint at the first time it is called the number of
- * partitions is obtained from HDFS name node
- */
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
- try {
- FileSplit[] returnedSplits = ((AbstractHiveFileSplitProvider) fileSplitProvider).getFileSplitArray();
- Random random = new Random(System.currentTimeMillis());
- ncMapping = ConfUtil.getNCMapping();
- NCs = ConfUtil.getNCs();
-
- int size = 0;
- for (FileSplit split : returnedSplits)
- if (split != null)
- size++;
-
- FileSplit[] splits = new FileSplit[size];
- for (int i = 0; i < returnedSplits.length; i++)
- if (returnedSplits[i] != null)
- splits[i] = returnedSplits[i];
-
- System.out.println("number of splits: " + splits.length);
- constraintsByHostNames = new String[splits.length];
- for (int i = 0; i < splits.length; i++) {
- try {
- String[] loc = splits[i].getLocations();
- Collections.shuffle(Arrays.asList(loc), random);
- if (loc.length > 0) {
- InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
- for (InetAddress ip : allIps) {
- if (ncMapping.get(ip.getHostAddress()) != null) {
- List<String> ncs = ncMapping.get(ip.getHostAddress());
- int pos = random.nextInt(ncs.size());
- constraintsByHostNames[i] = ncs.get(pos);
- } else {
- int pos = random.nextInt(NCs.length);
- constraintsByHostNames[i] = NCs[pos];
- }
- }
- } else {
- int pos = random.nextInt(NCs.length);
- constraintsByHostNames[i] = NCs[pos];
- if (splits[i].getLength() > 0)
- throw new IllegalStateException("non local scanner non locations!!");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- parts = new Partition[splits.length];
- for (int i = 0; i < splits.length; i++) {
- parts[i] = new Partition(splits[i]);
- }
- return new AlgebricksAbsolutePartitionConstraint(constraintsByHostNames);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-
- final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
- final int partitionId = partition;
-
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
- @Override
- public void initialize() throws HyracksDataException {
- writer.open();
- FileSplit split = parts[partitionId].toFileSplit();
- if (split == null)
- throw new HyracksDataException("partition " + partitionId + " is null!");
- ((AbstractHiveTupleParser) tp).parse(split, writer);
- writer.close();
- }
- };
- }
-}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
deleted file mode 100644
index af52f27..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.UUID;
-
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.eclipse.jetty.util.log.Log;
-
-@SuppressWarnings({ "deprecation", "rawtypes" })
-public class HiveFileSplitProvider extends AbstractHiveFileSplitProvider {
- private static final long serialVersionUID = 1L;
-
- private transient InputFormat format;
- private transient JobConf conf;
- private String confContent;
- final private int nPartition;
- private transient FileSplit[] splits;
-
- public HiveFileSplitProvider(JobConf conf, String filePath, int nPartition) {
- format = conf.getInputFormat();
- this.conf = conf;
- this.nPartition = nPartition;
- writeConfContent();
- }
-
- private void writeConfContent() {
- File dir = new File("hadoop-conf-tmp");
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
- try {
- DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
- conf.writeXml(out);
- out.close();
-
- DataInputStream in = new DataInputStream(new FileInputStream(fileName));
- StringBuffer buffer = new StringBuffer();
- String line;
- while ((line = in.readLine()) != null) {
- buffer.append(line + "\n");
- }
- in.close();
- confContent = buffer.toString();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void readConfContent() {
- File dir = new File("hadoop-conf-tmp");
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
- try {
- PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
- out.write(confContent);
- out.close();
- conf = new JobConf(fileName);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- /**
- * get the HDFS file split
- */
- public FileSplit[] getFileSplitArray() {
- readConfContent();
- conf.setClassLoader(this.getClass().getClassLoader());
- format = conf.getInputFormat();
- // int splitSize = conf.getInt("mapred.min.split.size", 0);
-
- if (splits == null) {
- try {
- splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);
- System.out.println("hdfs split number: " + splits.length);
- } catch (IOException e) {
- String inputPath = conf.get("mapred.input.dir");
- String hdfsURL = conf.get("fs.default.name");
- String alternatePath = inputPath.replaceAll(hdfsURL, "file:");
- conf.set("mapred.input.dir", alternatePath);
- try {
- splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);
- System.out.println("hdfs split number: " + splits.length);
- } catch (IOException e1) {
- e1.printStackTrace();
- Log.debug(e1.getMessage());
- return null;
- }
- }
- }
- return splits;
- }
-}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
new file mode 100644
index 0000000..472994a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -0,0 +1,209 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
+import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParser<K, V> implements IKeyValueParser<K, V> {
+ /**
+ * the columns to output: projection is pushed into this scan
+ */
+ private int[] outputColumnsOffset;
+
+ /**
+ * serialization/de-serialization object
+ */
+ private SerDe serDe;
+
+ /**
+ * the input row object inspector
+ */
+ private StructObjectInspector structInspector;
+
+ /**
+ * the hadoop job conf
+ */
+ private JobConf job;
+
+ /**
+ * Hyrax context to control resource allocation
+ */
+ private final IHyracksTaskContext ctx;
+
+ /**
+ * lazy serde: format flow in between operators
+ */
+ private final SerDe outputSerDe;
+
+ /**
+ * the parser from hive data to binary data
+ */
+ private IHiveParser parser;
+
+ /**
+ * the buffer for buffering output data
+ */
+ private ByteBuffer buffer;
+
+ /**
+ * the frame tuple appender
+ */
+ private FrameTupleAppender appender;
+
+ /**
+ * the array tuple builder
+ */
+ private ArrayTupleBuilder tb;
+
+ /**
+ * the field references of all fields
+ */
+ private List<? extends StructField> fieldRefs;
+
+ /**
+ * output fields
+ */
+ private Object[] outputFields;
+
+ /**
+ * output field references
+ */
+ private StructField[] outputFieldRefs;
+
+ public HiveKeyValueParser(String serDeClass, String outputSerDeClass, Properties tbl, JobConf conf,
+ final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws HyracksDataException {
+ try {
+ job = conf;
+ // initialize the input serde
+ serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);
+ serDe.initialize(job, tbl);
+ // initialize the output serde
+ outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);
+ outputSerDe.initialize(job, tbl);
+ // object inspector of the row
+ structInspector = (StructObjectInspector) serDe.getObjectInspector();
+ // hyracks context
+ this.ctx = ctx;
+ this.outputColumnsOffset = outputColumnsOffset;
+
+ if (structInspector instanceof LazySimpleStructObjectInspector) {
+ LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) structInspector;
+ List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();
+ boolean lightWeightParsable = true;
+ for (StructField fieldRef : fieldRefs) {
+ Category category = fieldRef.getFieldObjectInspector().getCategory();
+ if (!(category == Category.PRIMITIVE)) {
+ lightWeightParsable = false;
+ break;
+ }
+ }
+ if (lightWeightParsable) {
+ parser = new TextToBinaryTupleParser(this.outputColumnsOffset, structInspector);
+ }
+ }
+
+ fieldRefs = structInspector.getAllStructFieldRefs();
+ int size = 0;
+ for (int i = 0; i < outputColumnsOffset.length; i++) {
+ if (outputColumnsOffset[i] >= 0) {
+ size++;
+ }
+ }
+
+ tb = new ArrayTupleBuilder(size);
+ outputFieldRefs = new StructField[size];
+ outputFields = new Object[size];
+ for (int i = 0; i < outputColumnsOffset.length; i++)
+ if (outputColumnsOffset[i] >= 0)
+ outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ buffer = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(buffer, true);
+ }
+
+ @Override
+ public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+ try {
+ tb.reset();
+ if (parser != null) {
+ Text text = (Text) value;
+ parser.parse(text.getBytes(), 0, text.getLength(), tb);
+ } else {
+ Object row = serDe.deserialize((Writable) value);
+ /**
+ * write fields to the tuple builder one by one
+ */
+ int i = 0;
+ for (StructField fieldRef : fieldRefs) {
+ if (outputColumnsOffset[i] >= 0)
+ outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);
+ i++;
+ }
+ i = 0;
+ DataOutput dos = tb.getDataOutput();
+ for (Object field : outputFields) {
+ BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,
+ outputFieldRefs[i].getFieldObjectInspector());
+ dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());
+ tb.addFieldEndOffset();
+ i++;
+ }
+ }
+
+ /**
+ * append the tuple and flush it if necessary.
+ */
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(buffer, writer);
+ appender.reset(buffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ /**
+ * flush the residual tuples
+ */
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(buffer, writer);
+ }
+ System.gc();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
new file mode 100644
index 0000000..05903b9
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParserFactory<K, V> implements IKeyValueParserFactory<K, V> {
+ private static final long serialVersionUID = 1L;
+ private final String serDeClass;
+ private final String outputSerDeClass = LazySerDe.class.getName();;
+ private final Properties tbl;
+ private final ConfFactory confFactory;
+ private final int[] outputColumnsOffset;
+
+ public HiveKeyValueParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumnsOffset)
+ throws HyracksDataException {
+ this.tbl = desc.getProperties();
+ this.serDeClass = (String) tbl.getProperty("serialization.lib");
+ this.outputColumnsOffset = outputColumnsOffset;
+ this.confFactory = new ConfFactory(conf);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new HiveKeyValueParser(serDeClass, outputSerDeClass, tbl, confFactory.getConf(), ctx,
+ outputColumnsOffset);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
deleted file mode 100644
index 718c311..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
-import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
-public class HiveTupleParser extends AbstractHiveTupleParser {
-
- private int[] outputColumnsOffset;
- /**
- * class of input format
- */
- private InputFormat inputFormat;
-
- /**
- * serialization/deserialization object
- */
- private SerDe serDe;
-
- /**
- * the input row object inspector
- */
- private ObjectInspector objectInspector;
-
- /**
- * the hadoop job conf
- */
- private JobConf job;
-
- /**
- * Hyrax context to control resource allocation
- */
- private final IHyracksTaskContext ctx;
-
- /**
- * lazy serde: format flow in between operators
- */
- private final SerDe outputSerDe;
-
- /**
- * the parser from hive data to binary data
- */
- private IHiveParser parser = null;
-
- /**
- * parser for any hive input format
- *
- * @param inputFormatClass
- * @param serDeClass
- * @param tbl
- * @param conf
- * @throws AlgebricksException
- */
- public HiveTupleParser(String inputFormatClass, String serDeClass, String outputSerDeClass, Properties tbl,
- JobConf conf, final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws AlgebricksException {
- try {
- conf.setClassLoader(this.getClass().getClassLoader());
-
- inputFormat = (InputFormat) ReflectionUtils.newInstance(Class.forName(inputFormatClass), conf);
- job = conf;
-
- // initialize the input serde
- serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);
- serDe.initialize(job, tbl);
-
- // initialize the output serde
- outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);
- outputSerDe.initialize(job, tbl);
-
- // object inspector of the row
- objectInspector = serDe.getObjectInspector();
-
- // hyracks context
- this.ctx = ctx;
- this.outputColumnsOffset = outputColumnsOffset;
-
- if (objectInspector instanceof LazySimpleStructObjectInspector) {
- LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) objectInspector;
- List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();
- boolean lightWeightParsable = true;
- for (StructField fieldRef : fieldRefs) {
- Category category = fieldRef.getFieldObjectInspector().getCategory();
- if (!(category == Category.PRIMITIVE)) {
- lightWeightParsable = false;
- break;
- }
- }
- if (lightWeightParsable)
- parser = new TextToBinaryTupleParser(this.outputColumnsOffset, this.objectInspector);
- }
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
-
- /**
- * parse a input HDFS file split, the result is send to the writer
- * one-frame-a-time
- *
- * @param split
- * the HDFS file split
- * @param writer
- * the writer
- * @throws HyracksDataException
- * if there is sth. wrong in the ser/de
- */
- @Override
- public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException {
- try {
- StructObjectInspector structInspector = (StructObjectInspector) objectInspector;
-
- // create the reader, key, and value
- RecordReader reader = inputFormat.getRecordReader(split, job, Reporter.NULL);
- Object key = reader.createKey();
- Object value = reader.createValue();
-
- // allocate a new frame
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
-
- List<? extends StructField> fieldRefs = structInspector.getAllStructFieldRefs();
- int size = 0;
- for (int i = 0; i < outputColumnsOffset.length; i++)
- if (outputColumnsOffset[i] >= 0)
- size++;
-
- ArrayTupleBuilder tb = new ArrayTupleBuilder(size);
- DataOutput dos = tb.getDataOutput();
- StructField[] outputFieldRefs = new StructField[size];
- Object[] outputFields = new Object[size];
- for (int i = 0; i < outputColumnsOffset.length; i++)
- if (outputColumnsOffset[i] >= 0)
- outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
-
- while (reader.next(key, value)) {
- // reuse the tuple builder
- tb.reset();
- if (parser != null) {
- Text text = (Text) value;
- parser.parse(text.getBytes(), 0, text.getLength(), tb);
- } else {
- Object row = serDe.deserialize((Writable) value);
- // write fields to the tuple builder one by one
- int i = 0;
- for (StructField fieldRef : fieldRefs) {
- if (outputColumnsOffset[i] >= 0)
- outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);
- i++;
- }
-
- i = 0;
- for (Object field : outputFields) {
- BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,
- outputFieldRefs[i].getFieldObjectInspector());
- dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());
- tb.addFieldEndOffset();
- i++;
- }
- }
-
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- if (appender.getTupleCount() <= 0)
- throw new IllegalStateException("zero tuples in a frame!");
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
- reader.close();
- System.gc();
-
- // flush the last frame
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (SerDeException e) {
- throw new HyracksDataException(e);
- }
- }
-}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
deleted file mode 100644
index 98d730f..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-@SuppressWarnings("deprecation")
-public class HiveTupleParserFactory implements ITupleParserFactory {
-
- private static final long serialVersionUID = 1L;
-
- private int[] outputColumns;
-
- private String outputSerDeClass = LazySerDe.class.getName();
-
- private String inputSerDeClass;
-
- private transient JobConf conf;
-
- private Properties tbl;
-
- private String confContent;
-
- private String inputFormatClass;
-
- public HiveTupleParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumns) {
- this.conf = conf;
- tbl = desc.getProperties();
- inputFormatClass = (String) tbl.getProperty("file.inputformat");
- inputSerDeClass = (String) tbl.getProperty("serialization.lib");
- this.outputColumns = outputColumns;
-
- writeConfContent();
- }
-
- @Override
- public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
- readConfContent();
- try {
- return new HiveTupleParser(inputFormatClass, inputSerDeClass, outputSerDeClass, tbl, conf, ctx,
- outputColumns);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
-
- private void writeConfContent() {
- File dir = new File("hadoop-conf-tmp");
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
- try {
- DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
- conf.writeXml(out);
- out.close();
-
- DataInputStream in = new DataInputStream(new FileInputStream(fileName));
- StringBuffer buffer = new StringBuffer();
- String line;
- while ((line = in.readLine()) != null) {
- buffer.append(line + "\n");
- }
- in.close();
- confContent = buffer.toString();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void readConfContent() {
- File dir = new File("hadoop-conf-tmp");
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
- try {
- PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
- out.write(confContent);
- out.close();
-
- conf = new JobConf(fileName);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
deleted file mode 100644
index 6742a34..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-@SuppressWarnings("deprecation")
-public class Partition implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String uri;
- private long offset;
- private long length;
- private String[] locations;
-
- public Partition() {
- }
-
- public Partition(FileSplit file) {
- uri = file.getPath().toUri().toString();
- offset = file.getStart();
- length = file.getLength();
- try {
- locations = file.getLocations();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- public FileSplit toFileSplit() {
- return new FileSplit(new Path(uri), offset, length, locations);
- }
-}
diff --git a/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
deleted file mode 100644
index 0c701c8..0000000
--- a/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.hyracks;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-}
diff --git a/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java
deleted file mode 100644
index 0c701c8..0000000
--- a/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.hyracks;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 5923e1e..5d35ec5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -29,12 +29,24 @@
public interface IKeyValueParser<K, V> {
/**
+ * Initialize the key value parser.
+ *
+ * @param writer
+ * The hyracks writer for outputting data.
+ * @throws HyracksDataException
+ */
+ public void open(IFrameWriter writer) throws HyracksDataException;
+
+ /**
* Parse a key-value pair returned by HDFS record reader to a tuple.
* when the parsers' internal buffer is full, it can flush the buffer to the writer
*
* @param key
+ * The key returned from Hadoop's InputReader.
* @param value
+ * The value returned from Hadoop's InputReader.
* @param writer
+ * The hyracks writer for outputting data.
* @throws HyracksDataException
*/
public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
@@ -44,7 +56,8 @@
* This method is called in the close() of HDFSReadOperatorDescriptor.
*
* @param writer
+ * The hyracks writer for outputting data.
* @throws HyracksDataException
*/
- public void flush(IFrameWriter writer) throws HyracksDataException;
+ public void close(IFrameWriter writer) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
index 6e943ad..7d6f868 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -18,6 +18,7 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* Users need to implement this interface to use the HDFSReadOperatorDescriptor.
@@ -36,6 +37,6 @@
* the IHyracksTaskContext
* @return a key-value parser instance.
*/
- public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+ public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
index 25b9523..8e85627 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
@@ -26,6 +26,15 @@
public interface ITupleWriter {
/**
+ * Initialize the the tuple writer.
+ *
+ * @param output
+ * The channel for output data.
+ * @throws HyracksDataException
+ */
+ public void open(DataOutput output) throws HyracksDataException;
+
+ /**
* Write the tuple to the DataOutput.
*
* @param output
@@ -36,4 +45,13 @@
*/
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
+ /**
+ * Close the writer.
+ *
+ * @param output
+ * The channel for output data.
+ * @throws HyracksDataException
+ */
+ public void close(DataOutput output) throws HyracksDataException;
+
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 839de8f..9a025c2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -17,14 +17,19 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
*/
public interface ITupleWriterFactory extends Serializable {
/**
+ * @param ctx
+ * the IHyracksTaskContext
* @return a tuple writer instance
*/
- public ITupleWriter getTupleWriter();
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index e924650..f49688b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -102,6 +102,7 @@
JobConf conf = confFactory.getConf();
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
writer.open();
+ parser.open(writer);
InputFormat inputFormat = conf.getInputFormat();
for (int i = 0; i < inputSplits.length; i++) {
/**
@@ -131,7 +132,7 @@
}
}
}
- parser.flush(writer);
+ parser.close(writer);
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index ff97a29..36d5f55 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,7 +89,7 @@
String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputDirPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter();
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
try {
FileSystem dfs = FileSystem.get(conf);
dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index c691f5d..9574bb4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -43,6 +43,11 @@
return new IKeyValueParser<LongWritable, Text>() {
@Override
+ public void open(IFrameWriter writer) {
+
+ }
+
+ @Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
tb.reset();
tb.addField(value.getBytes(), 0, value.getLength());
@@ -56,7 +61,7 @@
}
@Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
+ public void close(IFrameWriter writer) throws HyracksDataException {
FrameUtils.flushFrame(buffer, writer);
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index d26721d..0da14e5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -26,9 +27,14 @@
private static final long serialVersionUID = 1L;
@Override
- public ITupleWriter getTupleWriter() {
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
return new ITupleWriter() {
- byte newLine = "\n".getBytes()[0];
+ private byte newLine = "\n".getBytes()[0];
+
+ @Override
+ public void open(DataOutput output) {
+
+ }
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
@@ -43,6 +49,11 @@
}
}
+ @Override
+ public void close(DataOutput output) {
+
+ }
+
};
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index eb7b6e1..9e9abdf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -147,7 +147,7 @@
}
}
}
- parser.flush(writer);
+ parser.close(writer);
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 390a7b5..86ee527 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -88,7 +88,7 @@
String outputPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter();
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
dos = dfs.create(new Path(fileName), true);
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index b3f78e1..e643331 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -77,6 +77,7 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
<configuration>