Subset hive-exec, make twitter4j a provided dep.
Change-Id: Iee4276f540ec8552181bfb452882654b5faa17df
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1427
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 0d991da..d4a9e5f 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -155,8 +155,6 @@
</ignoredUsedUndeclaredDependencies>
<usedDependencies>
<usedDependency>org.apache.hadoop:hadoop-common</usedDependency>
- <usedDependency>commons-lang:commons-lang</usedDependency>
- <usedDependency>com.google.guava:guava</usedDependency>
<usedDependency>org.apache.asterix:asterix-external-data</usedDependency>
</usedDependencies>
</configuration>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index da01510..c603571 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -229,14 +229,21 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-hivecompat</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>4.0.3</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.3</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.rome</groupId>
@@ -256,7 +263,7 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
+ <artifactId>hive-serde</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
@@ -393,5 +400,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
index f315950..a269144 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
@@ -41,7 +41,7 @@
public static final String EXTERNAL_FILE_INDEX_NAME_SUFFIX = "FilesIndex";
public static final String KEY_INPUT_FORMAT = "input-format";
public static final String INPUT_FORMAT_RC = "rc-input-format";
- public static final String INPUT_FORMAT_RC_FULLY_QUALIFIED = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ public static final String INPUT_FORMAT_RC_FULLY_QUALIFIED = "org.apache.asterix.hivecompat.io.RCFileInputFormat";
//Field Types
public static final IAType FILE_NUMBER_FIELD_TYPE = BuiltinType.AINT32;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
index 95d76ba..3f9d90e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -27,7 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.asterix.hivecompat.io.RCFile.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 881c498..a89d13e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -107,7 +107,7 @@
*/
public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat";
- public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.asterix.hivecompat.io.RCFileInputFormat";
public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
/**
* input formats aliases
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 42b582b..9556054 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.asterix.hivecompat.io.RCFileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
diff --git a/asterixdb/asterix-hivecompat/pom.xml b/asterixdb/asterix-hivecompat/pom.xml
new file mode 100644
index 0000000..1180763
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/pom.xml
@@ -0,0 +1,72 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.8.9-SNAPSHOT</version>
+ </parent>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <artifactId>asterix-hivecompat</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <excludes>**/hivecompat/**/*</excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
new file mode 100644
index 0000000..aab4c72
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.hivecompat.io;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+ private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+ /**
+ * A global compressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
+ new HashMap<Class<Compressor>, List<Compressor>>();
+
+ /**
+ * A global decompressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
+ new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+ private static <T> T borrow(Map<Class<T>, List<T>> pool,
+ Class<? extends T> codecClass) {
+ T codec = null;
+
+ // Check if an appropriate codec is available
+ synchronized (pool) {
+ if (pool.containsKey(codecClass)) {
+ List<T> codecList = pool.get(codecClass);
+
+ if (codecList != null) {
+ synchronized (codecList) {
+ if (!codecList.isEmpty()) {
+ codec = codecList.remove(codecList.size() - 1);
+ }
+ }
+ }
+ }
+ }
+
+ return codec;
+ }
+
+ private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+ if (codec != null) {
+ Class<T> codecClass = (Class<T>) codec.getClass();
+ synchronized (pool) {
+ if (!pool.containsKey(codecClass)) {
+ pool.put(codecClass, new ArrayList<T>());
+ }
+
+ List<T> codecList = pool.get(codecClass);
+ synchronized (codecList) {
+ codecList.add(codec);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+ * pool or a new one.
+ *
+ * @param codec
+ * the <code>CompressionCodec</code> for which to get the
+ * <code>Compressor</code>
+ * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
+ * from the pool or a new one
+ */
+ public static Compressor getCompressor(CompressionCodec codec) {
+ Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+ if (compressor == null) {
+ compressor = codec.createCompressor();
+ LOG.info("Got brand-new compressor");
+ } else {
+ LOG.debug("Got recycled compressor");
+ }
+ return compressor;
+ }
+
+ /**
+ * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+ * pool or a new one.
+ *
+ * @param codec
+ * the <code>CompressionCodec</code> for which to get the
+ * <code>Decompressor</code>
+ * @return <code>Decompressor</code> for the given
+ * <code>CompressionCodec</code> the pool or a new one
+ */
+ public static Decompressor getDecompressor(CompressionCodec codec) {
+ Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+ .getDecompressorType());
+ if (decompressor == null) {
+ decompressor = codec.createDecompressor();
+ LOG.info("Got brand-new decompressor");
+ } else {
+ LOG.debug("Got recycled decompressor");
+ }
+ return decompressor;
+ }
+
+ /**
+ * Return the {@link Compressor} to the pool.
+ *
+ * @param compressor
+ * the <code>Compressor</code> to be returned to the pool
+ */
+ public static void returnCompressor(Compressor compressor) {
+ if (compressor == null) {
+ return;
+ }
+ compressor.reset();
+ payback(COMPRESSOR_POOL, compressor);
+ }
+
+ /**
+ * Return the {@link Decompressor} to the pool.
+ *
+ * @param decompressor
+ * the <code>Decompressor</code> to be returned to the pool
+ */
+ public static void returnDecompressor(Decompressor decompressor) {
+ if (decompressor == null) {
+ return;
+ }
+ decompressor.reset();
+ payback(DECOMPRESSOR_POOL, decompressor);
+ }
+
+ private CodecPool() {
+ // prevent instantiation
+ }
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
new file mode 100644
index 0000000..4ac8a59
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Check for validity of the input files.
+ */
+public interface InputFormatChecker {
+
+ /**
+ * This method is used to validate the input files.
+ *
+ */
+ boolean validateInput(FileSystem fs, HiveConf conf,
+ ArrayList<FileStatus> files) throws IOException;
+
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
new file mode 100644
index 0000000..8cb890b
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.PushbackInputStream;
+import java.io.UTFDataFormatException;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+ DataInput {
+
+ private final NonSyncByteArrayInputStream buffer;
+
+ byte[] buff = new byte[16];
+
+ /** Constructs a new empty buffer. */
+ public NonSyncDataInputBuffer() {
+ this(new NonSyncByteArrayInputStream());
+ }
+
+ private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int length) {
+ buffer.reset(input, 0, length);
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int start, int length) {
+ buffer.reset(input, start, length);
+ }
+
+ /** Returns the current position in the input. */
+ public int getPosition() {
+ return buffer.getPosition();
+ }
+
+ /** Returns the length of the input. */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+ /**
+ * Reads bytes from the source stream into the byte array <code>buffer</code>.
+ * The number of bytes actually read is returned.
+ *
+ * @param buffer
+ * the buffer to read bytes into
+ * @return the number of bytes actually read or -1 if end of stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int read(byte[] buffer) throws IOException {
+ return in.read(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Read at most <code>length</code> bytes from this DataInputStream and stores
+ * them in byte array <code>buffer</code> starting at <code>offset</code>.
+ * Answer the number of bytes actually read or -1 if no bytes were read and
+ * end of stream was encountered.
+ *
+ * @param buffer
+ * the byte array in which to store the read bytes.
+ * @param offset
+ * the offset in <code>buffer</code> to store the read bytes.
+ * @param length
+ * the maximum number of bytes to store in <code>buffer</code>.
+ * @return the number of bytes actually read or -1 if end of stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ return in.read(buffer, offset, length);
+ }
+
+ /**
+ * Reads a boolean from this stream.
+ *
+ * @return the next boolean value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final boolean readBoolean() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp != 0;
+ }
+
+ /**
+ * Reads an 8-bit byte value from this stream.
+ *
+ * @return the next byte value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final byte readByte() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return (byte) temp;
+ }
+
+ /**
+ * Reads a 16-bit character value from this stream.
+ *
+ * @return the next <code>char</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ private int readToBuff(int count) throws IOException {
+ int offset = 0;
+
+ while (offset < count) {
+ int bytesRead = in.read(buff, offset, count - offset);
+ if (bytesRead == -1) {
+ return bytesRead;
+ }
+ offset += bytesRead;
+ }
+ return offset;
+ }
+
+ @Override
+ public final char readChar() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+
+ }
+
+ /**
+ * Reads a 64-bit <code>double</code> value from this stream.
+ *
+ * @return the next <code>double</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /**
+ * Reads a 32-bit <code>float</code> value from this stream.
+ *
+ * @return the next <code>float</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /**
+ * Reads bytes from this stream into the byte array <code>buffer</code>. This
+ * method will block until <code>buffer.length</code> number of bytes have
+ * been read.
+ *
+ * @param buffer
+ * to read bytes into
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final void readFully(byte[] buffer) throws IOException {
+ readFully(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Reads bytes from this stream and stores them in the byte array
+ * <code>buffer</code> starting at the position <code>offset</code>. This
+ * method blocks until <code>count</code> bytes have been read.
+ *
+ * @param buffer
+ * the byte array into which the data is read
+ * @param offset
+ * the offset the operation start at
+ * @param length
+ * the maximum number of bytes to read
+ *
+ * @throws IOException
+ * if a problem occurs while reading from this stream
+ * @throws EOFException
+ * if reaches the end of the stream before enough bytes have been
+ * read
+ */
+ @Override
+ public final void readFully(byte[] buffer, int offset, int length)
+ throws IOException {
+ if (length < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (length == 0) {
+ return;
+ }
+ if (in == null || buffer == null) {
+ throw new NullPointerException("Null Pointer to underlying input stream");
+ }
+
+ if (offset < 0 || offset > buffer.length - length) {
+ throw new IndexOutOfBoundsException();
+ }
+ while (length > 0) {
+ int result = in.read(buffer, offset, length);
+ if (result < 0) {
+ throw new EOFException();
+ }
+ offset += result;
+ length -= result;
+ }
+ }
+
+ /**
+ * Reads a 32-bit integer value from this stream.
+ *
+ * @return the next <code>int</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int readInt() throws IOException {
+ if (readToBuff(4) < 0) {
+ throw new EOFException();
+ }
+ return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+ | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+ }
+
+ /**
+ * Answers a <code>String</code> representing the next line of text available
+ * in this BufferedReader. A line is represented by 0 or more characters
+ * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or
+ * end of stream. The <code>String</code> does not include the newline
+ * sequence.
+ *
+ * @return the contents of the line or null if no characters were read before
+ * end of stream.
+ *
+ * @throws IOException
+ * If the DataInputStream is already closed or some other IO error
+ * occurs.
+ *
+ * @deprecated Use BufferedReader
+ */
+ @Deprecated
+ @Override
+ public final String readLine() throws IOException {
+ StringBuilder line = new StringBuilder(80); // Typical line length
+ boolean foundTerminator = false;
+ while (true) {
+ int nextByte = in.read();
+ switch (nextByte) {
+ case -1:
+ if (line.length() == 0 && !foundTerminator) {
+ return null;
+ }
+ return line.toString();
+ case (byte) '\r':
+ if (foundTerminator) {
+ ((PushbackInputStream) in).unread(nextByte);
+ return line.toString();
+ }
+ foundTerminator = true;
+ /* Have to be able to peek ahead one byte */
+ if (!(in.getClass() == PushbackInputStream.class)) {
+ in = new PushbackInputStream(in);
+ }
+ break;
+ case (byte) '\n':
+ return line.toString();
+ default:
+ if (foundTerminator) {
+ ((PushbackInputStream) in).unread(nextByte);
+ return line.toString();
+ }
+ line.append((char) nextByte);
+ }
+ }
+ }
+
+ /**
+ * Reads a 64-bit <code>long</code> value from this stream.
+ *
+ * @return the next <code>long</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final long readLong() throws IOException {
+ if (readToBuff(8) < 0) {
+ throw new EOFException();
+ }
+ int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+ | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+ int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16)
+ | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff);
+
+ return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
+ }
+
+ /**
+ * Reads a 16-bit <code>short</code> value from this stream.
+ *
+ * @return the next <code>short</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final short readShort() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+ }
+
+ /**
+ * Reads an unsigned 8-bit <code>byte</code> value from this stream and
+ * returns it as an int.
+ *
+ * @return the next unsigned byte value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int readUnsignedByte() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp;
+ }
+
+ /**
+ * Reads a 16-bit unsigned <code>short</code> value from this stream and
+ * returns it as an int.
+ *
+ * @return the next unsigned <code>short</code> value from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int readUnsignedShort() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+ }
+
+ /**
+ * Reads a UTF format String from this Stream.
+ *
+ * @return the next UTF String from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final String readUTF() throws IOException {
+ return decodeUTF(readUnsignedShort());
+ }
+
+ String decodeUTF(int utfSize) throws IOException {
+ return decodeUTF(utfSize, this);
+ }
+
+ private static String decodeUTF(int utfSize, DataInput in) throws IOException {
+ byte[] buf = new byte[utfSize];
+ char[] out = new char[utfSize];
+ in.readFully(buf, 0, utfSize);
+
+ return convertUTF8WithBuf(buf, out, 0, utfSize);
+ }
+
+ /**
+ * Reads a UTF format String from the DataInput Stream <code>in</code>.
+ *
+ * @param in
+ * the input stream to read from
+ * @return the next UTF String from the source stream.
+ *
+ * @throws IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public static final String readUTF(DataInput in) throws IOException {
+ return decodeUTF(in.readUnsignedShort(), in);
+ }
+
+ /**
+ * Skips <code>count</code> number of bytes in this stream. Subsequent
+ * <code>read()</code>'s will not return these bytes unless
+ * <code>reset()</code> is used.
+ *
+ * @param count
+ * the number of bytes to skip.
+ * @return the number of bytes actually skipped.
+ *
+ * @throws IOException
+ * If the stream is already closed or another IOException occurs.
+ */
+ @Override
+ public final int skipBytes(int count) throws IOException {
+ int skipped = 0;
+ long skip;
+ while (skipped < count && (skip = in.skip(count - skipped)) != 0) {
+ skipped += skip;
+ }
+ if (skipped < 0) {
+ throw new EOFException();
+ }
+ return skipped;
+ }
+
+ public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset,
+ int utfSize) throws UTFDataFormatException {
+ int count = 0, s = 0, a;
+ while (count < utfSize) {
+ if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
+ s++;
+ } else if (((a = out[s]) & 0xe0) == 0xc0) {
+ if (count >= utfSize) {
+ throw new UTFDataFormatException();
+ }
+ int b = buf[count++];
+ if ((b & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
+ } else if ((a & 0xf0) == 0xe0) {
+ if (count + 1 >= utfSize) {
+ throw new UTFDataFormatException();
+ }
+ int b = buf[count++];
+ int c = buf[count++];
+ if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
+ } else {
+ throw new UTFDataFormatException();
+ }
+ }
+ return new String(out, 0, s);
+ }
+
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
new file mode 100644
index 0000000..878f130
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
+
+/**
+ * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputBuffer extends DataOutputStream {
+
+ private final NonSyncByteArrayOutputStream buffer;
+
+ /** Constructs a new empty buffer. */
+ public NonSyncDataOutputBuffer() {
+ this(new NonSyncByteArrayOutputStream());
+ }
+
+ private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Returns the current contents of the buffer. Data is only valid to
+ * {@link #getLength()}.
+ */
+ public byte[] getData() {
+ return buffer.getData();
+ }
+
+ /** Returns the length of the valid data currently in the buffer. */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+ /** Resets the buffer to empty. */
+ public NonSyncDataOutputBuffer reset() {
+ written = 0;
+ buffer.reset();
+ return this;
+ }
+
+ /** Writes bytes from a DataInput directly into the buffer. */
+ public void write(DataInput in, int length) throws IOException {
+ buffer.write(in, length);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.write(b);
+ incCount(1);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ buffer.write(b, off, len);
+ incCount(len);
+ }
+
+ private void incCount(int value) {
+ if (written + value < 0) {
+ written = Integer.MAX_VALUE;
+ } else {
+ written += value;
+ }
+ }
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
new file mode 100644
index 0000000..0c5613a
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
@@ -0,0 +1,2049 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * <code>RCFile</code>s, short of Record Columnar File, are flat files
+ * consisting of binary key/value pairs, which shares much similarity with
+ * <code>SequenceFile</code>.
+ *
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part. When writing, RCFile.Writer first holds records'
+ * value bytes in memory, and determines a row split if the raw bytes size of
+ * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
+ * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+ 4 * 1024 * 1024)</code> .
+ * <p>
+ * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
+ * writing, reading respectively.
+ * </p>
+ *
+ * <p>
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part.
+ * </p>
+ *
+ * <p>
+ * RCFile compresses values in a more fine-grained manner then record level
+ * compression. However, It currently does not support compress the key part
+ * yet. The actual compression algorithm used to compress key and/or values can
+ * be specified by using the appropriate {@link CompressionCodec}.
+ * </p>
+ *
+ * <p>
+ * The {@link Reader} is used to read and explain the bytes of RCFile.
+ * </p>
+ *
+ * <h4 id="Formats">RCFile Formats</h4>
+ *
+ *
+ * <h5 id="Header">RC Header</h5>
+ * <ul>
+ * <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
+ * actual version number (e.g. RCF1)</li>
+ * <li>compression - A boolean which specifies if compression is turned on for
+ * keys/values in this file.</li>
+ * <li>compression codec - <code>CompressionCodec</code> class which is used
+ * for compression of keys and/or values (if compression is enabled).</li>
+ * <li>metadata - {@link Metadata} for this file.</li>
+ * <li>sync - A sync marker to denote end of the header.</li>
+ * </ul>
+ *
+ * <h5>RCFile Format</h5>
+ * <ul>
+ * <li><a href="#Header">Header</a></li>
+ * <li>Record
+ * <li>Key part
+ * <ul>
+ * <li>Record length in bytes</li>
+ * <li>Key length in bytes</li>
+ * <li>Number_of_rows_in_this_record(vint)</li>
+ * <li>Column_1_ondisk_length(vint)</li>
+ * <li>Column_1_row_1_value_plain_length</li>
+ * <li>Column_1_row_2_value_plain_length</li>
+ * <li>...</li>
+ * <li>Column_2_ondisk_length(vint)</li>
+ * <li>Column_2_row_1_value_plain_length</li>
+ * <li>Column_2_row_2_value_plain_length</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * </li>
+ * <li>Value part
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
+ *
+ * rcfile ::=
+ * <file-header>
+ * <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ * <file-version-header>
+ * <file-key-class-name> (only exists if version is seq6)
+ * <file-value-class-name> (only exists if version is seq6)
+ * <file-is-compressed>
+ * <file-is-block-compressed> (only exists if version is seq6)
+ * [<file-compression-codec-class>]
+ * <file-header-metadata>
+ * <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header. A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ * | Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ * Text {"org.apache.asterix.hivecompat.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ * Text {"org.apache.asterix.hivecompat.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ * <rowgroup-header>
+ * <rowgroup-key-data>
+ * <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ * [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ * <rowgroup-record-length>
+ * <rowgroup-key-length>
+ * <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ * <rowgroup-num-rows>
+ * <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ * <column-buffer-length>
+ * <column-buffer-uncompressed-length>
+ * <column-key-buffer-length>
+ * <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
+ */
+public class RCFile {
+
+ private static final Log LOG = LogFactory.getLog(RCFile.class);
+
+ public static final String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
+
+ public static final String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
+
+ public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
+
+ public static final String TOLERATE_CORRUPTIONS_CONF_STR =
+ "hive.io.rcfile.tolerate.corruptions";
+
+ // HACK: We actually need BlockMissingException, but that is not available
+ // in all hadoop versions.
+ public static final String BLOCK_MISSING_MESSAGE =
+ "Could not obtain block";
+
+ // All of the versions should be place in this list.
+ private static final int ORIGINAL_VERSION = 0; // version with SEQ
+ private static final int NEW_MAGIC_VERSION = 1; // version with RCF
+
+ private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
+
+ // The first version of RCFile used the sequence file header.
+ private static final byte[] ORIGINAL_MAGIC = new byte[] {
+ (byte) 'S', (byte) 'E', (byte) 'Q'};
+ // the version that was included with the original magic, which is mapped
+ // into ORIGINAL_VERSION
+ private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
+
+ private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[] {
+ (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
+ };
+
+ // The 'magic' bytes at the beginning of the RCFile
+ private static final byte[] MAGIC = new byte[] {
+ (byte) 'R', (byte) 'C', (byte) 'F'};
+
+ private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+ private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+ private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+
+ /** The number of bytes between sync points. */
+ public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+
+ /**
+ * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
+ * below:
+ *
+ * <ul>
+ * <li>record length in bytes,it is the sum of bytes used to store the key
+ * part and the value part.</li>
+ * <li>Key length in bytes, it is how many bytes used by the key part.</li>
+ * <li>number_of_rows_in_this_record(vint),</li>
+ * <li>column_1_ondisk_length(vint),</li>
+ * <li>column_1_row_1_value_plain_length,</li>
+ * <li>column_1_row_2_value_plain_length,</li>
+ * <li>....</li>
+ * <li>column_2_ondisk_length(vint),</li>
+ * <li>column_2_row_1_value_plain_length,</li>
+ * <li>column_2_row_2_value_plain_length,</li>
+ * <li>.... .</li>
+ * <li>{the end of the key part}</li>
+ * </ul>
+ */
+ public static class KeyBuffer implements WritableComparable {
+ // each column's length in the value
+ private int[] eachColumnValueLen = null;
+ private int[] eachColumnUncompressedValueLen = null;
+ // stores each cell's length of a column in one DataOutputBuffer element
+ private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
+ // how many rows in this split
+ private int numberRows = 0;
+ // how many columns
+ private int columnNumber = 0;
+
+ // return the number of columns recorded in this file's header
+ public int getColumnNumber() {
+ return columnNumber;
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public KeyBuffer(){
+ }
+
+ KeyBuffer(int columnNum) {
+ columnNumber = columnNum;
+ eachColumnValueLen = new int[columnNumber];
+ eachColumnUncompressedValueLen = new int[columnNumber];
+ allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ KeyBuffer(int numberRows, int columnNum) {
+ this(columnNum);
+ this.numberRows = numberRows;
+ }
+
+ public void nullColumn(int columnIndex) {
+ eachColumnValueLen[columnIndex] = 0;
+ eachColumnUncompressedValueLen[columnIndex] = 0;
+ allCellValLenBuffer[columnIndex] = new NonSyncDataOutputBuffer();
+ }
+
+ /**
+ * add in a new column's meta data.
+ *
+ * @param columnValueLen
+ * this total bytes number of this column's values in this split
+ * @param colValLenBuffer
+ * each cell's length of this column's in this split
+ */
+ void setColumnLenInfo(int columnValueLen,
+ NonSyncDataOutputBuffer colValLenBuffer,
+ int columnUncompressedValueLen, int columnIndex) {
+ eachColumnValueLen[columnIndex] = columnValueLen;
+ eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
+ allCellValLenBuffer[columnIndex] = colValLenBuffer;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ eachColumnValueLen = new int[columnNumber];
+ eachColumnUncompressedValueLen = new int[columnNumber];
+ allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+
+ numberRows = WritableUtils.readVInt(in);
+ for (int i = 0; i < columnNumber; i++) {
+ eachColumnValueLen[i] = WritableUtils.readVInt(in);
+ eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
+ int bufLen = WritableUtils.readVInt(in);
+ if (allCellValLenBuffer[i] == null) {
+ allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
+ } else {
+ allCellValLenBuffer[i].reset();
+ }
+ allCellValLenBuffer[i].write(in, bufLen);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // out.writeInt(numberRows);
+ WritableUtils.writeVLong(out, numberRows);
+ for (int i = 0; i < eachColumnValueLen.length; i++) {
+ WritableUtils.writeVLong(out, eachColumnValueLen[i]);
+ WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
+ NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
+ int bufLen = colRowsLenBuf.getLength();
+ WritableUtils.writeVLong(out, bufLen);
+ out.write(colRowsLenBuf.getData(), 0, bufLen);
+ }
+ }
+
+ /**
+ * get number of bytes to store the keyBuffer.
+ *
+ * @return number of bytes used to store this KeyBuffer on disk
+ * @throws IOException
+ */
+ public int getSize() throws IOException {
+ int ret = 0;
+ ret += WritableUtils.getVIntSize(numberRows);
+ for (int i = 0; i < eachColumnValueLen.length; i++) {
+ ret += WritableUtils.getVIntSize(eachColumnValueLen[i]);
+ ret += WritableUtils.getVIntSize(eachColumnUncompressedValueLen[i]);
+ ret += WritableUtils.getVIntSize(allCellValLenBuffer[i].getLength());
+ ret += allCellValLenBuffer[i].getLength();
+ }
+
+ return ret;
+ }
+
+ @Override
+ public int compareTo(Object arg0) {
+ throw new RuntimeException("compareTo not supported in class "
+ + this.getClass().getName());
+ }
+
+ public int[] getEachColumnUncompressedValueLen() {
+ return eachColumnUncompressedValueLen;
+ }
+
+ public int[] getEachColumnValueLen() {
+ return eachColumnValueLen;
+ }
+
+ /**
+ * @return the numberRows
+ */
+ public int getNumberRows() {
+ return numberRows;
+ }
+ }
+
+ /**
+ * ValueBuffer is the value of each record in RCFile. Its on-disk layout is as
+ * below:
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ */
+ public static class ValueBuffer implements WritableComparable {
+
+ class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
+
+ int index = -1;
+ int colIndex = -1;
+
+ public LazyDecompressionCallbackImpl(int index, int colIndex) {
+ super();
+ this.index = index;
+ this.colIndex = colIndex;
+ }
+
+ @Override
+ public byte[] decompress() throws IOException {
+
+ if (decompressedFlag[index] || codec == null) {
+ return loadedColumnsValueBuffer[index].getData();
+ }
+
+ NonSyncDataOutputBuffer compressedData = compressedColumnsValueBuffer[index];
+ decompressBuffer.reset();
+ DataInputStream valueIn = new DataInputStream(deflatFilter);
+ deflatFilter.resetState();
+ if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
+ ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
+ }
+ decompressBuffer.reset(compressedData.getData(),
+ keyBuffer.eachColumnValueLen[colIndex]);
+
+ NonSyncDataOutputBuffer decompressedColBuf = loadedColumnsValueBuffer[index];
+ decompressedColBuf.reset();
+ decompressedColBuf.write(valueIn,
+ keyBuffer.eachColumnUncompressedValueLen[colIndex]);
+ decompressedFlag[index] = true;
+ numCompressed--;
+ return decompressedColBuf.getData();
+ }
+ }
+
+ // used to load columns' value into memory
+ private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
+ private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
+ private boolean[] decompressedFlag = null;
+ private int numCompressed;
+ private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
+ private boolean lazyDecompress = true;
+
+ boolean inited = false;
+
+ // used for readFields
+ KeyBuffer keyBuffer;
+ private int columnNumber = 0;
+
+ // set true for columns that needed to skip loading into memory.
+ boolean[] skippedColIDs = null;
+
+ CompressionCodec codec;
+
+ Decompressor valDecompressor = null;
+ NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
+ CompressionInputStream deflatFilter = null;
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public ValueBuffer() throws IOException {
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
+ this(keyBuffer, keyBuffer.columnNumber, null, null, true);
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
+ throws IOException {
+ this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+ boolean[] skippedCols, CompressionCodec codec) throws IOException {
+ this(currentKey, columnNumber, skippedCols, codec, true);
+ }
+
+ public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+ boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+ throws IOException {
+ this.lazyDecompress = lazyDecompress;
+ keyBuffer = currentKey;
+ this.columnNumber = columnNumber;
+
+ if (skippedCols != null && skippedCols.length > 0) {
+ skippedColIDs = skippedCols;
+ } else {
+ skippedColIDs = new boolean[columnNumber];
+ for (int i = 0; i < skippedColIDs.length; i++) {
+ skippedColIDs[i] = false;
+ }
+ }
+
+ int skipped = 0;
+ for (boolean currentSkip : skippedColIDs) {
+ if (currentSkip) {
+ skipped++;
+ }
+ }
+ loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+ - skipped];
+ decompressedFlag = new boolean[columnNumber - skipped];
+ lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
+ - skipped];
+ compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+ - skipped];
+ this.codec = codec;
+ if (codec != null) {
+ valDecompressor = CodecPool.getDecompressor(codec);
+ deflatFilter = codec.createInputStream(decompressBuffer,
+ valDecompressor);
+ }
+ if (codec != null) {
+ numCompressed = decompressedFlag.length;
+ } else {
+ numCompressed = 0;
+ }
+ for (int k = 0, readIndex = 0; k < columnNumber; k++) {
+ if (skippedColIDs[k]) {
+ continue;
+ }
+ loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
+ if (codec != null) {
+ decompressedFlag[readIndex] = false;
+ lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
+ readIndex, k);
+ compressedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
+ } else {
+ decompressedFlag[readIndex] = true;
+ }
+ readIndex++;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
+ int addIndex) {
+ loadedColumnsValueBuffer[addIndex] = valBuffer;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int addIndex = 0;
+ int skipTotal = 0;
+ for (int i = 0; i < columnNumber; i++) {
+ int vaRowsLen = keyBuffer.eachColumnValueLen[i];
+ // skip this column
+ if (skippedColIDs[i]) {
+ skipTotal += vaRowsLen;
+ continue;
+ }
+
+ if (skipTotal != 0) {
+ in.skipBytes(skipTotal);
+ skipTotal = 0;
+ }
+
+ NonSyncDataOutputBuffer valBuf;
+ if (codec != null){
+ // load into compressed buf first
+ valBuf = compressedColumnsValueBuffer[addIndex];
+ } else {
+ valBuf = loadedColumnsValueBuffer[addIndex];
+ }
+ valBuf.reset();
+ valBuf.write(in, vaRowsLen);
+ if (codec != null) {
+ decompressedFlag[addIndex] = false;
+ if (!lazyDecompress) {
+ lazyDecompressCallbackObjs[addIndex].decompress();
+ decompressedFlag[addIndex] = true;
+ }
+ }
+ addIndex++;
+ }
+ if (codec != null) {
+ numCompressed = decompressedFlag.length;
+ }
+
+ if (skipTotal != 0) {
+ in.skipBytes(skipTotal);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (codec != null) {
+ for (NonSyncDataOutputBuffer currentBuf : compressedColumnsValueBuffer) {
+ out.write(currentBuf.getData(), 0, currentBuf.getLength());
+ }
+ } else {
+ for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
+ out.write(currentBuf.getData(), 0, currentBuf.getLength());
+ }
+ }
+ }
+
+ public void nullColumn(int columnIndex) {
+ if (codec != null) {
+ compressedColumnsValueBuffer[columnIndex].reset();
+ } else {
+ loadedColumnsValueBuffer[columnIndex].reset();
+ }
+ }
+
+ public void clearColumnBuffer() throws IOException {
+ decompressBuffer.reset();
+ }
+
+ public void close() {
+ for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+ IOUtils.closeStream(element);
+ }
+ if (codec != null) {
+ IOUtils.closeStream(decompressBuffer);
+ if (valDecompressor != null) {
+ // Make sure we only return valDecompressor once.
+ CodecPool.returnDecompressor(valDecompressor);
+ valDecompressor = null;
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(Object arg0) {
+ throw new RuntimeException("compareTo not supported in class "
+ + this.getClass().getName());
+ }
+ }
+
+ /**
+ * Create a metadata object with alternating key-value pairs.
+ * Eg. metadata(key1, value1, key2, value2)
+ */
+ public static Metadata createMetadata(Text... values) {
+ if (values.length % 2 != 0) {
+ throw new IllegalArgumentException("Must have a matched set of " +
+ "key-value pairs. " + values.length+
+ " strings supplied.");
+ }
+ Metadata result = new Metadata();
+ for(int i=0; i < values.length; i += 2) {
+ result.set(values[i], values[i+1]);
+ }
+ return result;
+ }
+
+ /**
+ * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
+ * compatible with SequenceFile's.
+ *
+ */
+ public static class Writer {
+
+ Configuration conf;
+ FSDataOutputStream out;
+
+ CompressionCodec codec = null;
+ Metadata metadata = null;
+
+ // Insert a globally unique 16-byte value every few entries, so that one
+ // can seek into the middle of a file and then synchronize with record
+ // starts and ends by scanning for this value.
+ long lastSyncPos; // position of last sync
+ byte[] sync; // 16 random bytes
+ {
+ try {
+ MessageDigest digester = MessageDigest.getInstance("MD5");
+ long time = System.currentTimeMillis();
+ digester.update((new UID() + "@" + time).getBytes());
+ sync = digester.digest();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // how many records the writer buffers before it writes to disk
+ private int RECORD_INTERVAL = Integer.MAX_VALUE;
+ // the max size of memory for buffering records before writes them out
+ private int columnsBufferSize = 4 * 1024 * 1024; // 4M
+ // the conf string for COLUMNS_BUFFER_SIZE
+ public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+
+ // how many records already buffered
+ private int bufferedRecords = 0;
+
+ private final ColumnBuffer[] columnBuffers;
+
+ private int columnNumber = 0;
+
+ private final int[] columnValuePlainLength;
+
+ KeyBuffer key = null;
+ private final int[] plainTotalColumnLength;
+ private final int[] comprTotalColumnLength;
+
+ boolean useNewMagic = true;
+
+ /*
+ * used for buffering appends before flush them out
+ */
+ class ColumnBuffer {
+ // used for buffer a column's values
+ NonSyncDataOutputBuffer columnValBuffer;
+ // used to store each value's length
+ NonSyncDataOutputBuffer valLenBuffer;
+
+ /*
+ * use a run-length encoding. We only record run length if a same
+ * 'prevValueLen' occurs more than one time. And we negative the run
+ * length to distinguish a runLength and a normal value length. For
+ * example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And for
+ * value lengths 1,2,3 we record 1,2,3.
+ */
+ int runLength = 0;
+ int prevValueLength = -1;
+
+ ColumnBuffer() throws IOException {
+ columnValBuffer = new NonSyncDataOutputBuffer();
+ valLenBuffer = new NonSyncDataOutputBuffer();
+ }
+
+ public void append(BytesRefWritable data) throws IOException {
+ data.writeDataTo(columnValBuffer);
+ int currentLen = data.getLength();
+
+ if (prevValueLength < 0) {
+ startNewGroup(currentLen);
+ return;
+ }
+
+ if (currentLen != prevValueLength) {
+ flushGroup();
+ startNewGroup(currentLen);
+ } else {
+ runLength++;
+ }
+ }
+
+ private void startNewGroup(int currentLen) {
+ prevValueLength = currentLen;
+ runLength = 0;
+ }
+
+ public void clear() throws IOException {
+ valLenBuffer.reset();
+ columnValBuffer.reset();
+ prevValueLength = -1;
+ runLength = 0;
+ }
+
+ public void flushGroup() throws IOException {
+ if (prevValueLength >= 0) {
+ WritableUtils.writeVLong(valLenBuffer, prevValueLength);
+ if (runLength > 0) {
+ WritableUtils.writeVLong(valLenBuffer, ~runLength);
+ }
+ runLength = -1;
+ prevValueLength = -1;
+ }
+ }
+ }
+
+ public long getLength() throws IOException {
+ return out.getPos();
+ }
+
+ /** Constructs a RCFile Writer. */
+ public Writer(FileSystem fs, Configuration conf, Path name) throws IOException {
+ this(fs, conf, name, null, new Metadata(), null);
+ }
+
+ /**
+ * Constructs a RCFile Writer.
+ *
+ * @param fs
+ * the file system used
+ * @param conf
+ * the configuration file
+ * @param name
+ * the file name
+ * @throws IOException
+ */
+ public Writer(FileSystem fs, Configuration conf, Path name,
+ Progressable progress, CompressionCodec codec) throws IOException {
+ this(fs, conf, name, progress, new Metadata(), codec);
+ }
+
+ /**
+ * Constructs a RCFile Writer.
+ *
+ * @param fs
+ * the file system used
+ * @param conf
+ * the configuration file
+ * @param name
+ * the file name
+ * @param progress a progress meter to update as the file is written
+ * @param metadata a string to string map in the file header
+ * @throws IOException
+ */
+ public Writer(FileSystem fs, Configuration conf, Path name,
+ Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException {
+ this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096),
+ ShimLoader.getHadoopShims().getDefaultReplication(fs, name),
+ ShimLoader.getHadoopShims().getDefaultBlockSize(fs, name), progress,
+ metadata, codec);
+ }
+
+ /**
+ *
+ * Constructs a RCFile Writer.
+ *
+ * @param fs
+ * the file system used
+ * @param conf
+ * the configuration file
+ * @param name
+ * the file name
+ * @param bufferSize the size of the file buffer
+ * @param replication the number of replicas for the file
+ * @param blockSize the block size of the file
+ * @param progress the progress meter for writing the file
+ * @param metadata a string to string map in the file header
+ * @throws IOException
+ */
+ public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ Metadata metadata, CompressionCodec codec) throws IOException {
+ RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+ columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
+
+ if (metadata == null) {
+ metadata = new Metadata();
+ }
+ metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
+ + columnNumber));
+
+ columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+ 4 * 1024 * 1024);
+
+ columnValuePlainLength = new int[columnNumber];
+
+ columnBuffers = new ColumnBuffer[columnNumber];
+ for (int i = 0; i < columnNumber; i++) {
+ columnBuffers[i] = new ColumnBuffer();
+ }
+
+ init(conf, fs.create(name, true, bufferSize, replication,
+ blockSize, progress), codec, metadata);
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ key = new KeyBuffer(columnNumber);
+
+ plainTotalColumnLength = new int[columnNumber];
+ comprTotalColumnLength = new int[columnNumber];
+ }
+
+ /** Write the initial part of file header. */
+ void initializeFileHeader() throws IOException {
+ if (useNewMagic) {
+ out.write(MAGIC);
+ out.write(CURRENT_VERSION);
+ } else {
+ out.write(ORIGINAL_MAGIC_VERSION);
+ }
+ }
+
+ /** Write the final part of file header. */
+ void finalizeFileHeader() throws IOException {
+ out.write(sync); // write the sync bytes
+ out.flush(); // flush header
+ }
+
+ boolean isCompressed() {
+ return codec != null;
+ }
+
+ /** Write and flush the file header. */
+ void writeFileHeader() throws IOException {
+ if (useNewMagic) {
+ out.writeBoolean(isCompressed());
+ } else {
+ Text.writeString(out, KeyBuffer.class.getName());
+ Text.writeString(out, ValueBuffer.class.getName());
+ out.writeBoolean(isCompressed());
+ out.writeBoolean(false);
+ }
+
+ if (isCompressed()) {
+ Text.writeString(out, (codec.getClass()).getName());
+ }
+ metadata.write(out);
+ }
+
+ void init(Configuration conf, FSDataOutputStream out,
+ CompressionCodec codec, Metadata metadata) throws IOException {
+ this.conf = conf;
+ this.out = out;
+ this.codec = codec;
+ this.metadata = metadata;
+ this.useNewMagic =
+ conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
+ }
+
+ /** Returns the compression codec of data in this file. */
+ @SuppressWarnings("unused")
+ @Deprecated
+ public CompressionCodec getCompressionCodec() {
+ return codec;
+ }
+
+ /** create a sync point. */
+ public void sync() throws IOException {
+ if (sync != null && lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+ out.write(sync); // write sync
+ lastSyncPos = out.getPos(); // update lastSyncPos
+ }
+ }
+
+ /** Returns the configuration of this file. */
+ @SuppressWarnings("unused")
+ @Deprecated
+ Configuration getConf() {
+ return conf;
+ }
+
+ private void checkAndWriteSync() throws IOException {
+ if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+ sync();
+ }
+ }
+
+ private int columnBufferSize = 0;
+
+ /**
+ * Append a row of values. Currently it only can accept <
+ * {@link BytesRefArrayWritable}. If its <code>size()</code> is less than the
+ * column number in the file, zero bytes are appended for the empty columns.
+ * If its size() is greater then the column number in the file, the exceeded
+ * columns' bytes are ignored.
+ *
+ * @param val a BytesRefArrayWritable with the list of serialized columns
+ * @throws IOException
+ */
+ public void append(Writable val) throws IOException {
+
+ if (!(val instanceof BytesRefArrayWritable)) {
+ throw new UnsupportedOperationException(
+ "Currently the writer can only accept BytesRefArrayWritable");
+ }
+
+ BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
+ int size = columns.size();
+ for (int i = 0; i < size; i++) {
+ BytesRefWritable cu = columns.get(i);
+ int plainLen = cu.getLength();
+ columnBufferSize += plainLen;
+ columnValuePlainLength[i] += plainLen;
+ columnBuffers[i].append(cu);
+ }
+
+ if (size < columnNumber) {
+ for (int i = columns.size(); i < columnNumber; i++) {
+ columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
+ }
+ }
+
+ bufferedRecords++;
+ if ((columnBufferSize > columnsBufferSize)
+ || (bufferedRecords >= RECORD_INTERVAL)) {
+ flushRecords();
+ }
+ }
+
+ private void flushRecords() throws IOException {
+
+ key.numberRows = bufferedRecords;
+
+ Compressor compressor = null;
+ NonSyncDataOutputBuffer valueBuffer = null;
+ CompressionOutputStream deflateFilter = null;
+ DataOutputStream deflateOut = null;
+ boolean isCompressed = isCompressed();
+ int valueLength = 0;
+ if (isCompressed) {
+ ReflectionUtils.setConf(codec, this.conf);
+ compressor = CodecPool.getCompressor(codec);
+ valueBuffer = new NonSyncDataOutputBuffer();
+ deflateFilter = codec.createOutputStream(valueBuffer, compressor);
+ deflateOut = new DataOutputStream(deflateFilter);
+ }
+
+ for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+ ColumnBuffer currentBuf = columnBuffers[columnIndex];
+ currentBuf.flushGroup();
+
+ NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+ int colLen;
+ int plainLen = columnValuePlainLength[columnIndex];
+
+ if (isCompressed) {
+ if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
+ ((SchemaAwareCompressionOutputStream)deflateFilter).
+ setColumnIndex(columnIndex);
+ }
+ deflateFilter.resetState();
+ deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+ // find how much compressed data was added for this column
+ colLen = valueBuffer.getLength() - valueLength;
+ } else {
+ colLen = columnValuePlainLength[columnIndex];
+ }
+ valueLength += colLen;
+ key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
+ columnIndex);
+ plainTotalColumnLength[columnIndex] += plainLen;
+ comprTotalColumnLength[columnIndex] += colLen;
+ columnValuePlainLength[columnIndex] = 0;
+ }
+
+ int keyLength = key.getSize();
+ if (keyLength < 0) {
+ throw new IOException("negative length keys not allowed: " + key);
+ }
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
+
+ // Write the key out
+ writeKey(key, keyLength + valueLength, keyLength);
+ // write the value out
+ if (isCompressed) {
+ out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+ } else {
+ for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
+ NonSyncDataOutputBuffer buf =
+ columnBuffers[columnIndex].columnValBuffer;
+ out.write(buf.getData(), 0, buf.getLength());
+ }
+ }
+
+ // clear the columnBuffers
+ clearColumnBuffers();
+
+ bufferedRecords = 0;
+ columnBufferSize = 0;
+ }
+
+ /**
+ * flush a block out without doing anything except compressing the key part.
+ */
+ public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
+ int recordLen, int keyLength,
+ @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
+ writeKey(keyBuffer, recordLen, keyLength);
+ valueBuffer.write(out);
+ }
+
+ private void writeKey(KeyBuffer keyBuffer, int recordLen,
+ int keyLength) throws IOException {
+ checkAndWriteSync(); // sync
+ out.writeInt(recordLen); // total record length
+ out.writeInt(keyLength); // key portion length
+
+ if(this.isCompressed()) {
+ Compressor compressor = CodecPool.getCompressor(codec);
+ NonSyncDataOutputBuffer compressionBuffer =
+ new NonSyncDataOutputBuffer();
+ CompressionOutputStream deflateFilter =
+ codec.createOutputStream(compressionBuffer, compressor);
+ DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
+ //compress key and write key out
+ compressionBuffer.reset();
+ deflateFilter.resetState();
+ keyBuffer.write(deflateOut);
+ deflateOut.flush();
+ deflateFilter.finish();
+ int compressedKeyLen = compressionBuffer.getLength();
+ out.writeInt(compressedKeyLen);
+ out.write(compressionBuffer.getData(), 0, compressedKeyLen);
+ CodecPool.returnCompressor(compressor);
+ } else {
+ out.writeInt(keyLength);
+ keyBuffer.write(out);
+ }
+ }
+
+ private void clearColumnBuffers() throws IOException {
+ for (int i = 0; i < columnNumber; i++) {
+ columnBuffers[i].clear();
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (bufferedRecords > 0) {
+ flushRecords();
+ }
+ clearColumnBuffers();
+
+ if (out != null) {
+
+ // Close the underlying stream if we own it...
+ out.flush();
+ out.close();
+ out = null;
+ }
+ for (int i = 0; i < columnNumber; i++) {
+ LOG.info("Column#" + i + " : Plain Total Column Value Length: "
+ + plainTotalColumnLength[i]
+ + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]);
+ }
+ }
+ }
+
+ /**
+ * Read KeyBuffer/ValueBuffer pairs from a RCFile.
+ *
+ */
+ public static class Reader {
+ private static class SelectedColumn {
+ public int colIndex;
+ public int rowReadIndex;
+ public int runLength;
+ public int prvLength;
+ public boolean isNulled;
+ }
+ private final Path file;
+ private final FSDataInputStream in;
+
+ private byte version;
+
+ private CompressionCodec codec = null;
+ private Metadata metadata = null;
+
+ private final byte[] sync = new byte[SYNC_HASH_SIZE];
+ private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+ private boolean syncSeen;
+ private long lastSeenSyncPos = 0;
+
+ private long headerEnd;
+ private final long end;
+ private int currentKeyLength;
+ private int currentRecordLength;
+
+ private final Configuration conf;
+
+ private final ValueBuffer currentValue;
+
+ private int readRowsIndexInBuffer = 0;
+
+ private int recordsNumInValBuffer = 0;
+
+ private int columnNumber = 0;
+
+ private int loadColumnNum;
+
+ private int passedRowsNum = 0;
+
+ // Should we try to tolerate corruption? Default is No.
+ private boolean tolerateCorruptions = false;
+
+ private boolean decompress = false;
+
+ private Decompressor keyDecompressor;
+ NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+
+ //Current state of each selected column - e.g. current run length, etc.
+ // The size of the array is equal to the number of selected columns
+ private final SelectedColumn[] selectedColumns;
+
+ // map of original column id -> index among selected columns
+ private final int[] revPrjColIDs;
+
+ // column value lengths for each of the selected columns
+ private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+
+ /** Create a new RCFile reader. */
+ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException {
+ this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs
+ .getFileStatus(file).getLen());
+ }
+
+ /** Create a new RCFile reader. */
+ public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
+ long start, long length) throws IOException {
+ tolerateCorruptions = conf.getBoolean(
+ TOLERATE_CORRUPTIONS_CONF_STR, false);
+ conf.setInt("io.file.buffer.size", bufferSize);
+ this.file = file;
+ in = openFile(fs, file, bufferSize, length);
+ this.conf = conf;
+ end = start + length;
+ boolean succeed = false;
+ try {
+ if (start > 0) {
+ seek(0);
+ init();
+ seek(start);
+ } else {
+ init();
+ }
+ succeed = true;
+ } finally {
+ if (!succeed) {
+ if (in != null) {
+ try {
+ in.close();
+ } catch(IOException e) {
+ if (LOG != null && LOG.isDebugEnabled()) {
+ LOG.debug("Exception in closing " + in, e);
+ }
+ }
+ }
+ }
+ }
+
+ columnNumber = Integer.parseInt(metadata.get(
+ new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+
+ List<Integer> notSkipIDs = ColumnProjectionUtils
+ .getReadColumnIDs(conf);
+ boolean[] skippedColIDs = new boolean[columnNumber];
+ if(ColumnProjectionUtils.isReadAllColumns(conf)) {
+ Arrays.fill(skippedColIDs, false);
+ } else if (notSkipIDs.size() > 0) {
+ Arrays.fill(skippedColIDs, true);
+ for (int read : notSkipIDs) {
+ if (read < columnNumber) {
+ skippedColIDs[read] = false;
+ }
+ }
+ } else {
+ // select count(1)
+ Arrays.fill(skippedColIDs, true);
+ }
+ loadColumnNum = columnNumber;
+ if (skippedColIDs.length > 0) {
+ for (boolean skippedColID : skippedColIDs) {
+ if (skippedColID) {
+ loadColumnNum -= 1;
+ }
+ }
+ }
+
+
+ revPrjColIDs = new int[columnNumber];
+ // get list of selected column IDs
+ selectedColumns = new SelectedColumn[loadColumnNum];
+ colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
+ for (int i = 0, j = 0; i < columnNumber; ++i) {
+ if (!skippedColIDs[i]) {
+ SelectedColumn col = new SelectedColumn();
+ col.colIndex = i;
+ col.runLength = 0;
+ col.prvLength = -1;
+ col.rowReadIndex = 0;
+ selectedColumns[j] = col;
+ colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
+ revPrjColIDs[i] = j;
+ j++;
+ } else {
+ revPrjColIDs[i] = -1;
+ }
+ }
+
+ currentKey = createKeyBuffer();
+ boolean lazyDecompress = !tolerateCorruptions;
+ currentValue = new ValueBuffer(
+ null, columnNumber, skippedColIDs, codec, lazyDecompress);
+ }
+
+ /**
+ * Return the metadata (Text to Text map) that was written into the
+ * file.
+ */
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Return the metadata value associated with the given key.
+ * @param key the metadata key to retrieve
+ */
+ public Text getMetadataValueOf(Text key) {
+ return metadata.get(key);
+ }
+
+ /**
+ * Override this method to specialize the type of
+ * {@link FSDataInputStream} returned.
+ */
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length) throws IOException {
+ return fs.open(file, bufferSize);
+ }
+
+ private void init() throws IOException {
+ byte[] magic = new byte[MAGIC.length];
+ in.readFully(magic);
+
+ if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
+ byte vers = in.readByte();
+ if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
+ throw new IOException(file + " is a version " + vers +
+ " SequenceFile instead of an RCFile.");
+ }
+ version = ORIGINAL_VERSION;
+ } else {
+ if (!Arrays.equals(magic, MAGIC)) {
+ throw new IOException(file + " not a RCFile and has magic of " +
+ new String(magic));
+ }
+
+ // Set 'version'
+ version = in.readByte();
+ if (version > CURRENT_VERSION) {
+ throw new VersionMismatchException((byte) CURRENT_VERSION, version);
+ }
+ }
+
+ if (version == ORIGINAL_VERSION) {
+ try {
+ Class<?> keyCls = conf.getClassByName(Text.readString(in));
+ Class<?> valCls = conf.getClassByName(Text.readString(in));
+ if (!keyCls.equals(KeyBuffer.class)
+ || !valCls.equals(ValueBuffer.class)) {
+ throw new IOException(file + " not a RCFile");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(file + " not a RCFile", e);
+ }
+ }
+
+ decompress = in.readBoolean(); // is compressed?
+
+ if (version == ORIGINAL_VERSION) {
+ // is block-compressed? it should be always false.
+ boolean blkCompressed = in.readBoolean();
+ if (blkCompressed) {
+ throw new IOException(file + " not a RCFile.");
+ }
+ }
+
+ // setup the compression codec
+ if (decompress) {
+ String codecClassname = Text.readString(in);
+ try {
+ Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+ codecClassname).asSubclass(CompressionCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException(
+ "Unknown codec: " + codecClassname, cnfe);
+ }
+ keyDecompressor = CodecPool.getDecompressor(codec);
+ }
+
+ metadata = new Metadata();
+ metadata.readFields(in);
+
+ in.readFully(sync); // read sync bytes
+ headerEnd = in.getPos();
+ }
+
+ /** Return the current byte position in the input file. */
+ public synchronized long getPosition() throws IOException {
+ return in.getPos();
+ }
+
+ /**
+ * Set the current byte position in the input file.
+ *
+ * <p>
+ * The position passed must be a position returned by
+ * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
+ * arbitrary position, use {@link RCFile.Reader#sync(long)}. In another
+ * words, the current seek can only seek to the end of the file. For other
+ * positions, use {@link RCFile.Reader#sync(long)}.
+ */
+ public synchronized void seek(long position) throws IOException {
+ in.seek(position);
+ }
+
+ /**
+ * Resets the values which determine if there are more rows in the buffer
+ *
+ * This can be used after one calls seek or sync, if one called next before that.
+ * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
+ * buffer built up from the call to next.
+ */
+ public synchronized void resetBuffer() {
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = 0;
+ }
+
+ /** Seek to the next sync mark past a given position. */
+ public synchronized void sync(long position) throws IOException {
+ if (position + SYNC_SIZE >= end) {
+ seek(end);
+ return;
+ }
+
+ //this is to handle syn(pos) where pos < headerEnd.
+ if (position < headerEnd) {
+ // seek directly to first record
+ in.seek(headerEnd);
+ // note the sync marker "seen" in the header
+ syncSeen = true;
+ return;
+ }
+
+ try {
+ seek(position + 4); // skip escape
+
+ int prefix = sync.length;
+ int n = conf.getInt("io.bytes.per.checksum", 512);
+ byte[] buffer = new byte[prefix+n];
+ n = (int)Math.min(n, end - in.getPos());
+ /* fill array with a pattern that will never match sync */
+ Arrays.fill(buffer, (byte)(~sync[0]));
+ while(n > 0 && (in.getPos() + n) <= end) {
+ position = in.getPos();
+ in.readFully(buffer, prefix, n);
+ /* the buffer has n+sync bytes */
+ for(int i = 0; i < n; i++) {
+ int j;
+ for(j = 0; j < sync.length && sync[j] == buffer[i+j]; j++) {
+ /* nothing */
+ }
+ if(j == sync.length) {
+ /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+ in.seek(position + i - SYNC_SIZE);
+ return;
+ }
+ }
+ /* move the last 16 bytes to the prefix area */
+ System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
+ n = (int)Math.min(n, end - in.getPos());
+ }
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ }
+ }
+
+ private void handleChecksumException(ChecksumException e) throws IOException {
+ if (conf.getBoolean("io.skip.checksum.errors", false)) {
+ LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+ sync(getPosition() + conf.getInt("io.bytes.per.checksum", 512));
+ } else {
+ throw e;
+ }
+ }
+
+ private KeyBuffer createKeyBuffer() {
+ return new KeyBuffer(columnNumber);
+ }
+
+ /**
+ * Read and return the next record length, potentially skipping over a sync
+ * block.
+ *
+ * @return the length of the next record or -1 if there is no next record
+ * @throws IOException
+ */
+ private synchronized int readRecordLength() throws IOException {
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ int length = in.readInt();
+ if (sync != null && length == SYNC_ESCAPE) { // process
+ // a
+ // sync entry
+ lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
+ in.readFully(syncCheck); // read syncCheck
+ if (!Arrays.equals(sync, syncCheck)) {
+ throw new IOException("File is corrupt!");
+ }
+ syncSeen = true;
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ length = in.readInt(); // re-read length
+ } else {
+ syncSeen = false;
+ }
+ return length;
+ }
+
+ private void seekToNextKeyBuffer() throws IOException {
+ if (!keyInit) {
+ return;
+ }
+ if (!currentValue.inited) {
+ in.skip(currentRecordLength - currentKeyLength);
+ }
+ }
+
+ private int compressedKeyLen = 0;
+ NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
+ NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
+ NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
+
+ KeyBuffer currentKey = null;
+ boolean keyInit = false;
+
+ protected int nextKeyBuffer() throws IOException {
+ seekToNextKeyBuffer();
+ currentRecordLength = readRecordLength();
+ if (currentRecordLength == -1) {
+ keyInit = false;
+ return -1;
+ }
+ currentKeyLength = in.readInt();
+ compressedKeyLen = in.readInt();
+ if (decompress) {
+ keyTempBuffer.reset();
+ keyTempBuffer.write(in, compressedKeyLen);
+ keyDecompressBuffer.reset(keyTempBuffer.getData(), compressedKeyLen);
+ CompressionInputStream deflatFilter = codec.createInputStream(
+ keyDecompressBuffer, keyDecompressor);
+ DataInputStream compressedIn = new DataInputStream(deflatFilter);
+ deflatFilter.resetState();
+ keyDecompressedData.reset();
+ keyDecompressedData.write(compressedIn, currentKeyLength);
+ keyDataIn.reset(keyDecompressedData.getData(), currentKeyLength);
+ currentKey.readFields(keyDataIn);
+ } else {
+ currentKey.readFields(in);
+ }
+
+ keyInit = true;
+ currentValue.inited = false;
+
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = currentKey.numberRows;
+
+ for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
+ SelectedColumn col = selectedColumns[selIx];
+ int colIx = col.colIndex;
+ NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+ colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
+ col.rowReadIndex = 0;
+ col.runLength = 0;
+ col.prvLength = -1;
+ col.isNulled = colValLenBufferReadIn[selIx].getLength() == 0;
+ }
+
+ return currentKeyLength;
+ }
+
+ protected void currentValueBuffer() throws IOException {
+ if (!keyInit) {
+ nextKeyBuffer();
+ }
+ currentValue.keyBuffer = currentKey;
+ currentValue.clearColumnBuffer();
+ currentValue.readFields(in);
+ currentValue.inited = true;
+ }
+
+ public boolean nextBlock() throws IOException {
+ int keyLength = nextKeyBuffer();
+ if(keyLength > 0) {
+ currentValueBuffer();
+ return true;
+ }
+ return false;
+ }
+
+ private boolean rowFetched = false;
+
+ // use this buffer to hold column's cells value length for usages in
+ // getColumn(), instead of using colValLenBufferReadIn directly.
+ private final NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
+
+ /**
+ * Fetch all data in the buffer for a given column. This is useful for
+ * columnar operators, which perform operations on an array data of one
+ * column. It should be used together with {@link #nextColumnsBatch()}.
+ * Calling getColumn() with not change the result of
+ * {@link #next(LongWritable)} and
+ * {@link #getCurrentRow(BytesRefArrayWritable)}.
+ *
+ * @param columnID the number of the column to get 0 to N-1
+ * @throws IOException
+ */
+ public BytesRefArrayWritable getColumn(int columnID,
+ BytesRefArrayWritable rest) throws IOException {
+ int selColIdx = revPrjColIDs[columnID];
+ if (selColIdx == -1) {
+ return null;
+ }
+
+ if (rest == null) {
+ rest = new BytesRefArrayWritable();
+ }
+
+ rest.resetValid(recordsNumInValBuffer);
+
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ }
+
+ int columnNextRowStart = 0;
+ fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
+ .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
+ SelectedColumn selCol = selectedColumns[selColIdx];
+ byte[] uncompData = null;
+ ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
+ boolean decompressed = currentValue.decompressedFlag[selColIdx];
+ if (decompressed) {
+ uncompData =
+ currentValue.loadedColumnsValueBuffer[selColIdx].getData();
+ } else {
+ decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+ }
+ for (int i = 0; i < recordsNumInValBuffer; i++) {
+ colAdvanceRow(selColIdx, selCol);
+ int length = selCol.prvLength;
+
+ BytesRefWritable currentCell = rest.get(i);
+
+ if (decompressed) {
+ currentCell.set(uncompData, columnNextRowStart, length);
+ } else {
+ currentCell.set(decompCallBack, columnNextRowStart, length);
+ }
+ columnNextRowStart = columnNextRowStart + length;
+ }
+ return rest;
+ }
+
+ /**
+ * Read in next key buffer and throw any data in current key buffer and
+ * current value buffer. It will influence the result of
+ * {@link #next(LongWritable)} and
+ * {@link #getCurrentRow(BytesRefArrayWritable)}
+ *
+ * @return whether there still has records or not
+ * @throws IOException
+ */
+ @SuppressWarnings("unused")
+ @Deprecated
+ public synchronized boolean nextColumnsBatch() throws IOException {
+ passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
+ return nextKeyBuffer() > 0;
+ }
+
+ /**
+ * Returns how many rows we fetched with next(). It only means how many rows
+ * are read by next(). The returned result may be smaller than actual number
+ * of rows passed by, because {@link #seek(long)},
+ * {@link #nextColumnsBatch()} can change the underlying key buffer and
+ * value buffer.
+ *
+ * @return next row number
+ * @throws IOException
+ */
+ public synchronized boolean next(LongWritable readRows) throws IOException {
+ if (hasRecordsInBuffer()) {
+ readRows.set(passedRowsNum);
+ readRowsIndexInBuffer++;
+ passedRowsNum++;
+ rowFetched = false;
+ return true;
+ } else {
+ keyInit = false;
+ }
+
+ int ret = -1;
+ if (tolerateCorruptions) {
+ ret = nextKeyValueTolerateCorruptions();
+ } else {
+ try {
+ ret = nextKeyBuffer();
+ } catch (EOFException eof) {
+ eof.printStackTrace();
+ }
+ }
+ return (ret > 0) && next(readRows);
+ }
+
+ private int nextKeyValueTolerateCorruptions() throws IOException {
+ long currentOffset = in.getPos();
+ int ret;
+ try {
+ ret = nextKeyBuffer();
+ this.currentValueBuffer();
+ } catch (IOException ioe) {
+ // A BlockMissingException indicates a temporary error,
+ // not a corruption. Re-throw this exception.
+ String msg = ioe.getMessage();
+ if (msg != null && msg.startsWith(BLOCK_MISSING_MESSAGE)) {
+ LOG.warn("Re-throwing block-missing exception" + ioe);
+ throw ioe;
+ }
+ // We have an IOException other than a BlockMissingException.
+ LOG.warn("Ignoring IOException in file " + file +
+ " after offset " + currentOffset, ioe);
+ ret = -1;
+ } catch (Throwable t) {
+ // We got an exception that is not IOException
+ // (typically OOM, IndexOutOfBounds, InternalError).
+ // This is most likely a corruption.
+ LOG.warn("Ignoring unknown error in " + file +
+ " after offset " + currentOffset, t);
+ ret = -1;
+ }
+ return ret;
+ }
+
+ public boolean hasRecordsInBuffer() {
+ return readRowsIndexInBuffer < recordsNumInValBuffer;
+ }
+
+ /**
+ * get the current row used,make sure called {@link #next(LongWritable)}
+ * first.
+ *
+ * @throws IOException
+ */
+ public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
+
+ if (!keyInit || rowFetched) {
+ return;
+ }
+
+ if (tolerateCorruptions) {
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ }
+ ret.resetValid(columnNumber);
+ } else {
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ // do this only when not initialized, but we may need to find a way to
+ // tell the caller how to initialize the valid size
+ ret.resetValid(columnNumber);
+ }
+ }
+
+ // we do not use BytesWritable here to avoid the byte-copy from
+ // DataOutputStream to BytesWritable
+ if (currentValue.numCompressed > 0) {
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
+
+ colAdvanceRow(j, col);
+
+ if (currentValue.decompressedFlag[j]) {
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ } else {
+ ref.set(currentValue.lazyDecompressCallbackObjs[j],
+ col.rowReadIndex, col.prvLength);
+ }
+ col.rowReadIndex += col.prvLength;
+ }
+ }
+ } else {
+ // This version of the loop eliminates a condition check and branch
+ // and is measurably faster (20% or so)
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
+
+ colAdvanceRow(j, col);
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ col.rowReadIndex += col.prvLength;
+ }
+ }
+ }
+ rowFetched = true;
+ }
+
+ /**
+ * Advance column state to the next now: update offsets, run lengths etc
+ * @param selCol - index among selectedColumns
+ * @param col - column object to update the state of. prvLength will be
+ * set to the new read position
+ * @throws IOException
+ */
+ private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
+ if (col.runLength > 0) {
+ --col.runLength;
+ } else {
+ int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
+ if (length < 0) {
+ // we reach a runlength here, use the previous length and reset
+ // runlength
+ col.runLength = (~length) - 1;
+ } else {
+ col.prvLength = length;
+ col.runLength = 0;
+ }
+ }
+ }
+
+ /** Returns true iff the previous call to next passed a sync mark. */
+ @SuppressWarnings("unused")
+ public boolean syncSeen() {
+ return syncSeen;
+ }
+
+ /** Returns the last seen sync position. */
+ public long lastSeenSyncPos() {
+ return lastSeenSyncPos;
+ }
+
+ /** Returns the name of the file. */
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+
+ @SuppressWarnings("unused")
+ public boolean isCompressedRCFile() {
+ return this.decompress;
+ }
+
+ /** Close the reader. */
+ public void close() {
+ IOUtils.closeStream(in);
+ currentValue.close();
+ if (decompress) {
+ IOUtils.closeStream(keyDecompressedData);
+ if (keyDecompressor != null) {
+ // Make sure we only return keyDecompressor once.
+ CodecPool.returnDecompressor(keyDecompressor);
+ keyDecompressor = null;
+ }
+ }
+ }
+
+ /**
+ * return the KeyBuffer object used in the reader. Internally in each
+ * reader, there is only one KeyBuffer object, which gets reused for every
+ * block.
+ */
+ public KeyBuffer getCurrentKeyBufferObj() {
+ return this.currentKey;
+ }
+
+ /**
+ * return the ValueBuffer object used in the reader. Internally in each
+ * reader, there is only one ValueBuffer object, which gets reused for every
+ * block.
+ */
+ public ValueBuffer getCurrentValueBufferObj() {
+ return this.currentValue;
+ }
+
+ //return the current block's length
+ public int getCurrentBlockLength() {
+ return this.currentRecordLength;
+ }
+
+ //return the current block's key length
+ public int getCurrentKeyLength() {
+ return this.currentKeyLength;
+ }
+
+ //return the current block's compressed key length
+ public int getCurrentCompressedKeyLen() {
+ return this.compressedKeyLen;
+ }
+
+ //return the CompressionCodec used for this file
+ public CompressionCodec getCompressionCodec() {
+ return this.codec;
+ }
+
+ }
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
new file mode 100644
index 0000000..19a8f8a
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * RCFileInputFormat.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>
+ extends FileInputFormat<K, V> implements InputFormatChecker {
+
+ public RCFileInputFormat() {
+ setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ return new RCFileRecordReader(job, (FileSplit) split);
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf,
+ ArrayList<FileStatus> files) throws IOException {
+ if (files.size() <= 0) {
+ return false;
+ }
+ for (int fileId = 0; fileId < files.size(); fileId++) {
+ RCFile.Reader reader = null;
+ try {
+ reader = new RCFile.Reader(fs, files.get(fileId)
+ .getPath(), conf);
+ reader.close();
+ reader = null;
+ } catch (IOException e) {
+ return false;
+ } finally {
+ if (null != reader) {
+ reader.close();
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
new file mode 100644
index 0000000..6ddba27
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.asterix.hivecompat.io.RCFile.KeyBuffer;
+import org.apache.asterix.hivecompat.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * RCFileRecordReader.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>
+ implements RecordReader<LongWritable, BytesRefArrayWritable> {
+
+ private final Reader in;
+ private final long start;
+ private final long end;
+ private boolean more = true;
+ protected Configuration conf;
+ private final FileSplit split;
+ private final boolean useCache;
+
+ private static RCFileSyncCache syncCache = new RCFileSyncCache();
+
+ private static final class RCFileSyncEntry {
+ long end;
+ long endSync;
+ }
+
+ private static final class RCFileSyncCache {
+
+ private final Map<String, RCFileSyncEntry> cache;
+
+ public RCFileSyncCache() {
+ cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
+ }
+
+ public void put(FileSplit split, long endSync) {
+ Path path = split.getPath();
+ long end = split.getStart() + split.getLength();
+ String key = path.toString()+"+"+String.format("%d",end);
+
+ RCFileSyncEntry entry = new RCFileSyncEntry();
+ entry.end = end;
+ entry.endSync = endSync;
+ if(entry.endSync >= entry.end) {
+ cache.put(key, entry);
+ }
+ }
+
+ public long get(FileSplit split) {
+ Path path = split.getPath();
+ long start = split.getStart();
+ String key = path.toString()+"+"+String.format("%d",start);
+ RCFileSyncEntry entry = cache.get(key);
+ if(entry != null) {
+ return entry.endSync;
+ }
+ return -1;
+ }
+ }
+
+ public RCFileRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new RCFile.Reader(fs, path, conf);
+ this.end = split.getStart() + split.getLength();
+ this.conf = conf;
+ this.split = split;
+
+ useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
+
+ if (split.getStart() > in.getPosition()) {
+ long oldSync = useCache ? syncCache.get(split) : -1;
+ if(oldSync == -1) {
+ in.sync(split.getStart()); // sync to start
+ } else {
+ in.seek(oldSync);
+ }
+ }
+
+ this.start = in.getPosition();
+
+ more = start < end;
+ }
+
+ public Class<?> getKeyClass() {
+ return LongWritable.class;
+ }
+
+ public Class<?> getValueClass() {
+ return BytesRefArrayWritable.class;
+ }
+
+ public LongWritable createKey() {
+ return (LongWritable) ReflectionUtils.newInstance(getKeyClass(), conf);
+ }
+
+ public BytesRefArrayWritable createValue() {
+ return (BytesRefArrayWritable) ReflectionUtils.newInstance(getValueClass(),
+ conf);
+ }
+
+ public boolean nextBlock() throws IOException {
+ return in.nextBlock();
+ }
+
+ @Override
+ public boolean next(LongWritable key, BytesRefArrayWritable value)
+ throws IOException {
+
+ more = next(key);
+
+ if (more) {
+ in.getCurrentRow(value);
+ }
+ return more;
+ }
+
+ protected boolean next(LongWritable key) throws IOException {
+ if (!more) {
+ return false;
+ }
+
+ more = in.next(key);
+
+ long lastSeenSyncPos = in.lastSeenSyncPos();
+
+ if (lastSeenSyncPos >= end) {
+ if(useCache) {
+ syncCache.put(split, lastSeenSyncPos);
+ }
+ more = false;
+ return more;
+ }
+ return more;
+ }
+
+ /**
+ * Return the progress within the input split.
+ *
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ public KeyBuffer getKeyBuffer() {
+ return in.getCurrentKeyBufferObj();
+ }
+
+ protected void seek(long pos) throws IOException {
+ in.seek(pos);
+ }
+
+ public void sync(long pos) throws IOException {
+ in.sync(pos);
+ }
+
+ public void resetBuffer() {
+ in.resetBuffer();
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
new file mode 100644
index 0000000..f1eb555
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.*;
+/**
+ *
+ * SchemaAwareCompressionInputStream adds the ability to inform the compression
+ * stream what column is being read.
+ *
+ */
+public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
+
+ protected SchemaAwareCompressionInputStream(InputStream in) throws java.io.IOException {
+ super(in);
+ }
+
+ /**
+ * The column being read
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
new file mode 100644
index 0000000..68f95a3
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.*;
+
+/**
+ *
+ * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream
+ * the current column being compressed.
+ *
+ */
+public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream {
+
+ protected SchemaAwareCompressionOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ /**
+ *
+ * The column being output
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 4e6ce2a..ed476ca 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -153,8 +153,8 @@
<include>org.apache.httpcomponents:httpclient</include>
<include>org.htrace:htrace-core</include>
<include>commons-httpclient:commons-httpclient</include>
- <include>com.google.guava:guava</include>
<include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
</includes>
<location>lib</location>
</dependencySet>
@@ -201,7 +201,6 @@
<ignoredDependency>org.apache.hive:hive-exec:*</ignoredDependency>
</ignoredDependencies>
<usedDependencies>
- <usedDependency>com.google.guava:guava</usedDependency>
<usedDependency>commons-codec:commons-codec</usedDependency>
<usedDependency>commons-collections:commons-collections</usedDependency>
<usedDependency>commons-configuration:commons-configuration</usedDependency>
@@ -459,7 +458,6 @@
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
- <version>1.0.4</version>
</dependency>
</dependencies>
</project>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index a0f9633..043f801 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -601,6 +601,7 @@
<module>asterix-runtime</module>
<module>asterix-om</module>
<module>asterix-external-data</module>
+ <module>asterix-hivecompat</module>
<module>asterix-examples</module>
<module>asterix-metadata</module>
<module>asterix-test-framework</module>
@@ -681,6 +682,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -716,6 +723,44 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.13.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ <version>1.0.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-common</artifactId>
+ <version>0.13.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <version>0.13.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-common</artifactId>
<version>${algebricks.version}</version>