This change includes the following:
1. allowing short-circuit reads to be used with external data.
2. enhancing behaviour of index access for Text data.
3. updating the external data documentation.
Rebased on current Master.
The following commits from your working branch will be included:
commit 3176d741b8ef187703c7346044c9531a99f2b716
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Date: Wed Feb 4 18:32:24 2015 +0300
added changes from external experiments and reflected external data documentation to include external data indexing
Change-Id: I041c71391d8704cd800c4446a085beef197e7acf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/218
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Raman Grover <ramangrover29@gmail.com>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index a93461d..99d883c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -124,6 +124,7 @@
webServer.stop();
jsonAPIServer.stop();
+ feedServer.stop();
}
private IHyracksClientConnection getNewHyracksClientConnection() throws Exception {
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.aql
new file mode 100644
index 0000000..8f2b89f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.aql
@@ -0,0 +1,20 @@
+/*
+* Description : Create an external dataset that contains a tuples, the lines from a (*sequence*) file in HDFS.
+ Perform a word-count over the data in the dataset.
+ The external dataset is set to perform local reads (but this is not checked)
+* Expected Res : Success
+* Date : 6th Mar 2015
+*/
+drop dataverse test if exists;
+create dataverse test;
+
+use dataverse test;
+
+create type LineType as closed {
+ content: string
+};
+
+
+create external dataset TextDataset(LineType)
+using hdfs
+(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/textFileS"),("input-format"="sequence-input-format"),("format"="delimited-text"),("delimiter"="."),("local-socket-path"="/var/lib/hadoop-hdfs/dn_socket"));
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.2.update.aql
new file mode 100644
index 0000000..1152cf7
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.2.update.aql
@@ -0,0 +1,8 @@
+/*
+* Description : Create an external dataset that contains a tuples, the lines from a (*sequence*) file in HDFS.
+ Perform a word-count over the data in the dataset.
+ The external dataset is set to perform local reads (but this is not checked)
+* Expected Res : Success
+* Date : 6th Mar 2015
+*/
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.3.query.aql
new file mode 100644
index 0000000..27692df
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.3.query.aql
@@ -0,0 +1,15 @@
+/*
+* Description : Create an external dataset that contains a tuples, the lines from a (*sequence*) file in HDFS.
+ Perform a word-count over the data in the dataset.
+ The external dataset is set to perform local reads (but this is not checked)
+* Expected Res : Success
+* Date : 6th Mar 2015
+*/
+use dataverse test;
+
+for $line in dataset('TextDataset')
+let $tokens := word-tokens($line.content)
+for $token in $tokens
+group by $tok := $token with $token
+order by $tok
+return { "word": $tok, "count": count($token) }
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.adm
new file mode 100644
index 0000000..6da3bd8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.adm
@@ -0,0 +1,6 @@
+[ { "word": "am", "count": 1 }
+, { "word": "grover", "count": 1 }
+, { "word": "hi", "count": 1 }
+, { "word": "i", "count": 1 }
+, { "word": "raman", "count": 1 }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 94ae2f2..88271de 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6089,6 +6089,11 @@
</test-group>
<test-group name="hdfs">
+ <test-case FilePath="hdfs">
+ <compilation-unit name="hdfs_shortcircuit">
+ <output-dir compare="Text">hdfs_shortcircuit</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="hdfs">
<compilation-unit name="issue_245_hdfs">
<output-dir compare="Text">issue_245_hdfs</output-dir>
diff --git a/asterix-doc/src/site/markdown/aql/externaldata.md b/asterix-doc/src/site/markdown/aql/externaldata.md
index d2cc1a5..3d46cc3 100644
--- a/asterix-doc/src/site/markdown/aql/externaldata.md
+++ b/asterix-doc/src/site/markdown/aql/externaldata.md
@@ -3,26 +3,26 @@
## <a id="toc">Table of Contents</a> ##
* [Introduction](#Introduction)
- * [Adapter for an External Dataset](#IntroductionAdapterForAnExternalDataset)
- * [Creating an External Dataset](#IntroductionCreatingAnExternalDataset)
+* [Adapter for an External Dataset](#IntroductionAdapterForAnExternalDataset)
+* [Creating an External Dataset](#IntroductionCreatingAnExternalDataset)
* [Writing Queries against an External Dataset](#WritingQueriesAgainstAnExternalDataset)
+* [Building Indexes over External Datasets](#BuildingIndexesOverExternalDatasets)
+* [External Data Snapshots](#ExternalDataSnapshot)
+* [Frequently Asked Questions](#FAQ)
## <a id="Introduction">Introduction</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
-Data that needs to be processed by ASTERIX could be residing outside ASTERIX storage. Examples include data files on a distributed file system such as HDFS or on the local file system of a machine that is part of an ASTERIX cluster. For ASTERIX to process such data, end-user may create a regular dataset in ASTERIX (a.k.a. internal dataset) and load the dataset with the data. ASTERIX supports ''external datasets'' so that it is not necessary to “load” all data prior to using it. This also avoids creating multiple copies of data and the need to keep the copies in sync.
+Data that needs to be processed by AsterixDB could be residing outside AsterixDB storage. Examples include data files on a distributed file system such as HDFS or on the local file system of a machine that is part of an AsterixDB cluster. For AsterixDB to process such data, an end-user may create a regular dataset in AsterixDB (a.k.a. an internal dataset) and load the dataset with the data. AsterixDB also supports ‘‘external datasets’’ so that it is not necessary to “load” all data prior to using it. This also avoids creating multiple copies of data and the need to keep the copies in sync.
### <a id="IntroductionAdapterForAnExternalDataset">Adapter for an External Dataset</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ###
-External data is accessed using wrappers (adapters in ASTERIX) that abstract away the mechanism of connecting with an external service, receiving data and transforming the data into ADM records that are understood by ASTERIX. ASTERIX comes with built-in adapters for common storage systems such as HDFS or the local file system.
+External data is accessed using wrappers (adapters in AsterixDB) that abstract away the mechanism of connecting with an external service, receiving its data and transforming the data into ADM records that are understood by AsterixDB. AsterixDB comes with built-in adapters for common storage systems such as HDFS or the local file system.
### <a id="IntroductionCreatingAnExternalDataset">Creating an External Dataset</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ###
-As an example we consider the Lineitem dataset from [TPCH schema](http://www.openlinksw.com/dataspace/doc/dav/wiki/Main/VOSTPCHLinkedData/tpch.sql).
+As an example we consider the Lineitem dataset from the [TPCH schema](http://www.openlinksw.com/dataspace/doc/dav/wiki/Main/VOSTPCHLinkedData/tpch.sql).
+We assume that you have successfully created an AsterixDB instance following the instructions at [Installing AsterixDB Using Managix](../install.html). _For constructing an example, we assume a single machine setup.._
-We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](../install.html).
-_For constructing an example, we assume a single machine setup._
-
-Similar to a regular dataset, an external dataset has an associated datatype. We shall first create the datatype associated with each record in Lineitem data.
-Paste the following in the query textbox on the webpage at http://127.0.0.1:19001 and hit 'Execute'.
-
+Similar to a regular dataset, an external dataset has an associated datatype. We shall first create the datatype associated with each record in Lineitem data. Paste the following in the
+query textbox on the webpage at http://127.0.0.1:19001 and hit ‘Execute’.
create dataverse ExternalFileDemo;
use dataverse ExternalFileDemo;
@@ -46,7 +46,7 @@
l_comment: string}
-We describe here two scenarios.
+Here, we describe two scenarios.
#### 1) Data file resides on the local file system of a host####
Prerequisite: The host is a part of the ASTERIX cluster.
@@ -77,7 +77,10 @@
<tr>
<td> format </td>
<td> The format for the content. Use 'adm' for data in ADM (ASTERIX Data Model) or <a href="http://www.json.org/">JSON</a> format. Use 'delimited-text' if fields are separated by a delimiting character (eg., CSV). </td></tr>
-<tr><td>delimiter</td><td>The delimiting character in the source file if format is 'delimited text'</td></tr>
+<tr>
+ <td>delimiter</td>
+ <td>The delimiting character in the source file if format is 'delimited text'</td>
+</tr>
</table>
As we are using a single single machine ASTERIX instance, we use 127.0.0.1 as host in the path parameter.
@@ -112,15 +115,12 @@
Next we move over to the the section [Writing Queries against an External Dataset](#Writing_Queries_against_an_External_Dataset) and try a sample query against the external dataset.
#### 2) Data file resides on an HDFS instance ####
-Pre-requisite: It is required that the Namenode and atleast one of the HDFS Datanodes are reachable from the hosts that form the ASTERIX cluster. ASTERIX provides a built-in adapter for data residing on HDFS. The HDFS adapter is referred (in AQL) by its alias - 'hdfs'. We create an external dataset named Lineitem and associate the HDFS adapter with it.
+rerequisite: It is required that the Namenode and HDFS Datanodes are reachable from the hosts that form the AsterixDB cluster. AsterixDB provides a built-in adapter for data residing on HDFS. The HDFS adapter can be referred (in AQL) by its alias - ‘hdfs’. We can create an external dataset named Lineitem and associate the HDFS adapter with it as follows;
+ create external dataset Lineitem(LineitemType)
+ using hdfs((“hdfs”:”hdfs://localhost:54310”),(“path”:”/asterix/Lineitem.tbl”),...,(“input- format”:”rc-format”));
- create external dataset Lineitem(LineitemType)
- using hdfs
-
-
-The above statement is *not complete* as we need to provide a set of parameters specific to the HDFS instance and the source file.
-These parameters are described below.
+The expected parameters are described below:
<table>
<tr>
@@ -133,50 +133,126 @@
</tr>
<tr>
<td> path </td>
- <td> The absolute path to the source HDFS file. Use a comma separated list if there are multiple files. </td></tr>
+ <td> The absolute path to the source HDFS file or directory. Use a comma separated list if there are multiple files or directories. </td></tr>
<tr>
<td> input-format </td>
- <td> The associated input format. Use 'text-input-format' for textual data or 'sequence-input-format' for binary data (sequence files). </td>
+ <td> The associated input format. Use 'text-input-format' for text files , 'sequence-input-format' for hadoop sequence files, 'rc-input-format' for Hadoop Record Columnar files, or a fully qualified name of an implementation of org.apache.hadoop.mapred.InputFormat. </td>
</tr>
<tr>
<td> format </td>
- <td> The format for the content. Use 'adm' for data in ADM (ASTERIX Data Model) or
- <a href="http://www.json.org/">JSON</a> format and use 'delimited-text' for delimited data
- that has fields separated by a delimiting character. </td>
+ <td> The format of the input content. Use 'adm' for text data in ADM (ASTERIX Data Model) or <a href="http://www.json.org/">JSON</a> format, 'delimited-text' for text delimited data that has fields separated by a delimiting character, 'binary' for other data.</td>
</tr>
<tr>
<td> delimiter </td>
<td> The delimiting character in the source file if format is 'delimited text' </td>
</tr>
+<tr>
+ <td> parser </td>
+ <td> The parser used to parse HDFS records if the format is 'binary'. Use 'hive- parser' for data deserialized by a Hive Serde (AsterixDB can understand deserialized Hive objects) or a fully qualified class name of user- implemented parser that implements the interface edu.uci.ics.asterix.external.input.InputParser. </td>
+</tr>
+<tr>
+ <td> hive-serde </td>
+ <td> The Hive serde is used to deserialize HDFS records if format is binary and the parser is hive-parser. Use a fully qualified name of a class implementation of org.apache.hadoop.hive.serde2.SerDe. </td>
+</tr>
+<tr>
+ <td> local-socket-path </td>
+ <td> The UNIX domain socket path if local short-circuit reads are enabled in the HDFS instance</td>
+</tr>
</table>
*Difference between 'input-format' and 'format'*
-*input-format*: File stored under HDFS have an associated storage format For example, TextInputFormat represents plain text files. SequenceFileInputFormat indicates binary compressed file. The parameter 'input-format' is used to distinguish between these two kind of files.
+*input-format*: Files stored under HDFS have an associated storage format. For example,
+TextInputFormat represents plain text files. SequenceFileInputFormat indicates binary compressed files. RCFileInputFormat corresponds to records stored in a record columnar fashion. The parameter ‘input-format’ is used to distinguish between these and other HDFS input formats.
-*format*:
-The parameter 'format' refers to the type of the data contained in the file. For example data contained in a file could be in json, ADM format or could be delimited-text with fields separated by a delimiting character.
+*format*: The parameter ‘format’ refers to the type of the data contained in the file. For example, data contained in a file could be in json or ADM format, could be in delimited-text with fields separated by a delimiting character or could be in binary format.
As an example. consider the [data file](../data/lineitem.tbl). The file is a text file with each line representing a record. The fields in each record are separated by the '|' character.
-We assume the HDFS URL to be hdfs://host:port. We further assume that the example data file is copied to the HDFS at a path denoted by HDFS_PATH.
+We assume the HDFS URL to be hdfs://localhost:54310. We further assume that the example data file is copied to HDFS at a path denoted by “/asterix/Lineitem.tbl”.
-The complete set of parameters for our example file are as follows. (("hdfs"="HDFS_URL",("path"="HDFS_PATH"),("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"))
-
-We modify the create external dataset statement as follows.
+The complete set of parameters for our example file are as follows. ((“hdfs”=“hdfs://localhost:54310”,(“path”=“/asterix/Lineitem.tbl”),(“input-format”=“text- input-format”),(“format”=“delimited-text”),(“delimiter”=“|”))
- create external dataset Lineitem('LineitemType)
- using hdfs
- (("hdfs"="HDFS_URL"),("path"="HDFS_PATH"),("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"));
+#### Using the Hive Parser ####
+
+if a user wants to create an external dataset that uses hive-parser to parse HDFS records, it is important that the datatype associated with the dataset matches the actual data in the Hive table for the correct initialization of the Hive SerDe. Here is the conversion from the supported Hive data types to AsterixDB data types:
+
+<table>
+<tr>
+ <td> Hive </td>
+ <td> AsterixDB </td>
+</tr>
+<tr>
+ <td>BOOLEAN</td>
+ <td>Boolean</td>
+</tr>
+<tr>
+ <td>BYTE(TINY INT)</td>
+ <td>Int8</td>
+</tr>
+<tr>
+ <td>DOUBLE</td>
+ <td>Double</td>
+</tr>
+<tr>
+ <td>FLOAT</td>
+ <td>Float</td>
+</tr>
+<tr>
+ <td>INT</td>
+ <td>Int32</td>
+</tr>
+<tr>
+ <td>LONG(BIG INT)</td>
+ <td>Int64</td>
+</tr>
+<tr>
+ <td>SHORT(SMALL INT)</td>
+ <td>Int16</td>
+</tr>
+<tr>
+ <td>STRING</td>
+ <td>String</td>
+</tr>
+<tr>
+ <td>TIMESTAMP</td>
+ <td>Datetime</td>
+</tr>
+<tr>
+ <td>DATE</td>
+ <td>Date</td>
+</tr>
+<tr>
+ <td>STRUCT</td>
+ <td>Nested Record</td>
+</tr>
+<tr>
+ <td>LIST</td>
+ <td>OrderedList or UnorderedList</td>
+</tr>
+</table>
-Once you have copied the source data file to your HDFS instance, substitute the values of HDFS_URL and HDFS_PATH in the above statement. In your web-browser, navigate to http://127.0.0.1:19001 and execute the above statement with substituted values.
+#### Examples of dataset definitions for external datasets ####
-You may now run the sample query in next section.
+*Example 1*: We can modify the create external dataset statement as follows:
+
+ create external dataset Lineitem('LineitemType)
+ using hdfs(("hdfs"="hdfs://localhost:54310"),("path"="/asterix/Lineitem.tbl"),("input-format"="text- input-format"),("format"="delimited-text"),("delimiter"="|"));
+
+*Example 2*: Here, we create an external dataset of lineitem records stored in sequence files that has content in ADM format:
+
+ create external dataset Lineitem('LineitemType)
+ using hdfs(("hdfs"="hdfs://localhost:54310"),("path"="/asterix/SequenceLineitem.tbl"),("input- format"="sequence-input-format"),("format"="adm"));
+
+*Example 3*: Here, we create an external dataset of lineitem records stored in record-columnar files that has content in binary format parsed using hive-parser with hive ColumnarSerde:
+
+ create external dataset Lineitem('LineitemType)
+ using hdfs(("hdfs"="hdfs://localhost:54310"),("path"="/asterix/RCLineitem.tbl"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive- serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerde"));
## <a id="WritingQueriesAgainstAnExternalDataset">Writing Queries against an External Dataset</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
-You may write AQL queries against an external dataset. Following is an example AQL query that applies a filter and returns an ordered result.
+You may write AQL queries against an external dataset in exactly the same way that queries are written against internal datasets. The following is an example of an AQL query that applies a filter and returns an ordered result.
use dataverse ExternalFileDemo;
@@ -186,21 +262,58 @@
order by $c.l_orderkey, $c.l_linenumber
return $c
+## <a id="BuildingIndexesOverExternalDatasets">Building Indexes over External Datasets</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+AsterixDB supports building B-Tree and R-Tree indexes over static data stored in the Hadoop Distributed File System.
+To create an index, first create an external dataset over the data as follows
-The expected result is:
+ create external dataset Lineitem(LineitemType)
+ using hdfs(("hdfs"="hdfs://localhost:54310"),("path"="/asterix/Lineitem.tbl"),("input-format"="text-input- format"),("format"="delimited-text"),("delimiter"="|"));
+You can then create a B-Tree index on this dataset instance as if the dataset was internally stored as follows:
- { "l_orderkey": 1, "l_partkey": 156, "l_suppkey": 4, "l_linenumber": 1, "l_quantity": 17, "l_extendedprice": 17954.55d, "l_discount": 0.04d, "l_tax": 0.02d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-03-13", "l_commitdate": "1996-02-12", "l_receiptdate": "1996-03-22", "l_shipinstruct": "DELIVER IN PERSON", "l_shipmode": "TRUCK", "l_comment": "egular courts above the" }
- { "l_orderkey": 1, "l_partkey": 68, "l_suppkey": 9, "l_linenumber": 2, "l_quantity": 36, "l_extendedprice": 34850.16d, "l_discount": 0.09d, "l_tax": 0.06d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-04-12", "l_commitdate": "1996-02-28", "l_receiptdate": "1996-04-20", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "MAIL", "l_comment": "ly final dependencies: slyly bold " }
- { "l_orderkey": 1, "l_partkey": 64, "l_suppkey": 5, "l_linenumber": 3, "l_quantity": 8, "l_extendedprice": 7712.48d, "l_discount": 0.1d, "l_tax": 0.02d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-01-29", "l_commitdate": "1996-03-05", "l_receiptdate": "1996-01-31", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "REG AIR", "l_comment": "riously. regular, express dep" }
- { "l_orderkey": 1, "l_partkey": 3, "l_suppkey": 6, "l_linenumber": 4, "l_quantity": 28, "l_extendedprice": 25284.0d, "l_discount": 0.09d, "l_tax": 0.06d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-04-21", "l_commitdate": "1996-03-30", "l_receiptdate": "1996-05-16", "l_shipinstruct": "NONE", "l_shipmode": "AIR", "l_comment": "lites. fluffily even de" }
- { "l_orderkey": 1, "l_partkey": 25, "l_suppkey": 8, "l_linenumber": 5, "l_quantity": 24, "l_extendedprice": 22200.48d, "l_discount": 0.1d, "l_tax": 0.04d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-03-30", "l_commitdate": "1996-03-14", "l_receiptdate": "1996-04-01", "l_shipinstruct": "NONE", "l_shipmode": "FOB", "l_comment": " pending foxes. slyly re" }
- { "l_orderkey": 1, "l_partkey": 16, "l_suppkey": 3, "l_linenumber": 6, "l_quantity": 32, "l_extendedprice": 29312.32d, "l_discount": 0.07d, "l_tax": 0.02d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-01-30", "l_commitdate": "1996-02-07", "l_receiptdate": "1996-02-03", "l_shipinstruct": "DELIVER IN PERSON", "l_shipmode": "MAIL", "l_comment": "arefully slyly ex" }
- { "l_orderkey": 2, "l_partkey": 107, "l_suppkey": 2, "l_linenumber": 1, "l_quantity": 38, "l_extendedprice": 38269.8d, "l_discount": 0.0d, "l_tax": 0.05d, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1997-01-28", "l_commitdate": "1997-01-14", "l_receiptdate": "1997-02-02", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": "ven requests. deposits breach a" }
- { "l_orderkey": 3, "l_partkey": 5, "l_suppkey": 2, "l_linenumber": 1, "l_quantity": 45, "l_extendedprice": 40725.0d, "l_discount": 0.06d, "l_tax": 0.0d, "l_returnflag": "R", "l_linestatus": "F", "l_shipdate": "1994-02-02", "l_commitdate": "1994-01-04", "l_receiptdate": "1994-02-23", "l_shipinstruct": "NONE", "l_shipmode": "AIR", "l_comment": "ongside of the furiously brave acco" }
- { "l_orderkey": 3, "l_partkey": 20, "l_suppkey": 10, "l_linenumber": 2, "l_quantity": 49, "l_extendedprice": 45080.98d, "l_discount": 0.1d, "l_tax": 0.0d, "l_returnflag": "R", "l_linestatus": "F", "l_shipdate": "1993-11-09", "l_commitdate": "1993-12-20", "l_receiptdate": "1993-11-24", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": " unusual accounts. eve" }
- { "l_orderkey": 3, "l_partkey": 129, "l_suppkey": 8, "l_linenumber": 3, "l_quantity": 27, "l_extendedprice": 27786.24d, "l_discount": 0.06d, "l_tax": 0.07d, "l_returnflag": "A", "l_linestatus": "F", "l_shipdate": "1994-01-16", "l_commitdate": "1993-11-22", "l_receiptdate": "1994-01-23", "l_shipinstruct": "DELIVER IN PERSON", "l_shipmode": "SHIP", "l_comment": "nal foxes wake. " }
- { "l_orderkey": 3, "l_partkey": 30, "l_suppkey": 5, "l_linenumber": 4, "l_quantity": 2, "l_extendedprice": 1860.06d, "l_discount": 0.01d, "l_tax": 0.06d, "l_returnflag": "A", "l_linestatus": "F", "l_shipdate": "1993-12-04", "l_commitdate": "1994-01-07", "l_receiptdate": "1994-01-01", "l_shipinstruct": "NONE", "l_shipmode": "TRUCK", "l_comment": "y. fluffily pending d" }
- { "l_orderkey": 3, "l_partkey": 184, "l_suppkey": 5, "l_linenumber": 5, "l_quantity": 28, "l_extendedprice": 30357.04d, "l_discount": 0.04d, "l_tax": 0.0d, "l_returnflag": "R", "l_linestatus": "F", "l_shipdate": "1993-12-14", "l_commitdate": "1994-01-10", "l_receiptdate": "1994-01-01", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "FOB", "l_comment": "ages nag slyly pending" }
- { "l_orderkey": 3, "l_partkey": 63, "l_suppkey": 8, "l_linenumber": 6, "l_quantity": 26, "l_extendedprice": 25039.56d, "l_discount": 0.1d, "l_tax": 0.02d, "l_returnflag": "A", "l_linestatus": "F", "l_shipdate": "1993-10-29", "l_commitdate": "1993-12-18", "l_receiptdate": "1993-11-04", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": "ges sleep after the caref" }
+ create index PartkeyIdx on Lineitem(l_partkey);
+You could also create an R-Tree index as follows:
+
+ create index IndexName on DatasetName(attribute-name) type rtree;
+
+After building the indexes, the AsterixDB query compiler can use them to access the dataset and answer queries in a more cost effective manner.
+AsterixDB can read all HDFS input formats, but indexes over external datasets can currently be built only for HDFS datasets with 'text-input-format', 'sequence-input-format' or 'rc-input-format'.
+
+## <a id="ExternalDataSnapshots">External Data Snapshots</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+An external data snapshot represents the status of a dataset's files in HDFS at a point in time. Upon creating the first index over an external dataset, AsterixDB captures and stores a snapshot of the dataset in HDFS. Only records present at the snapshot capture time are indexed, and any additional indexes created afterwards will only contain data that was present at the snapshot capture time thus preserving consistency across all indexes of a dataset.
+To update all indexes of an external dataset and advance the snapshot time to be the present time, a user can use the refresh external dataset command as follows:
+
+ refresh external dataset DatasetName;
+
+After a refresh operation commits, all of the dataset's indexes will reflect the status of the data as of the new snapshot capture time.
+
+## <a id="FAQ">Frequently Asked Questions</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+Q. I added data to my dataset in HDFS, Will the dataset indexes in AsterixDB be updated automatically?
+
+A. No, you must use the refresh external dataset statement to make the indexes aware of any changes in the dataset files in HDFS.
+
+Q. Why doesn't AsterixDB update external indexes automatically?
+
+A. Since external data is managed by other users/systems with mechanisms that are system dependent, AsterixDB has no way of knowing exactly when data is added or deleted in HDFS, so the responsibility of refreshing indexes are left to the user. A user can use internal datasets for which AsterixDB manages the data and its indexes.
+
+Q. I created an index over an external dataset and then added some data to my HDFS dataset. Will a query that uses the index return different results from a query that doesn't use the index?
+
+A. No, queries' results are access path independent and the stored snapshot is used to determines which data are going to be included when processing queries.
+
+Q. I created an index over an external dataset and then deleted some of my dataset's files in HDFS, Will indexed data access still return the records in deleted files?
+
+A. No. When AsterixDB accesses external data, with or without the use of indexes, it only access files present in the file system at runtime.
+
+Q. I submitted a refresh command on a an external dataset and a failure occurred, What has happened to my indexes?
+
+A. External Indexes Refreshes are treated as a single transaction. In case of a failure, a rollback occurs and indexes are restored to their previous state. An error message with the cause of failure is returned to the user.
+
+Q. I was trying to refresh an external dataset while some queries were accessing the data using index access method. Will the queries be affected by the refresh operation?
+
+A. Queries have access to external dataset indexes state at the time where the queries are submitted. A query that was submitted before a refresh commits will only access data under the snapshot taken before the refresh; queries that are submitted after the refresh commits will access data under the snapshot taken after the refresh.
+
+Q. What happens when I try to create an additional index while a refresh operation is in progress or vice versa?
+
+A. The create index operation will wait until the refresh commits or aborts and then the index will be built according to the external data snapshot at the end of the refresh operation. Creating indexes and refreshing datasets are mutually exclusive operations and will not be run in parallel. Multiple indexes can be created in parallel, but not multiple refresh operations.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ee481de..7595c32 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -73,6 +73,20 @@
public static final String INPUT_FORMAT_RC = "rc-input-format";
public static final String FORMAT_BINARY = "binary";
+ public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
+
+ // Hadoop property names constants
+ public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+ public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat";
+ public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
+ public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+ public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
+ public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
+ public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
+ public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
+ public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
+
private transient AlgebricksPartitionConstraint clusterLocations;
private String[] readSchedule;
private boolean executed[];
@@ -100,9 +114,9 @@
protected static Map<String, String> initInputFormatMap() {
Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
- formatClassNames.put(INPUT_FORMAT_RC, "org.apache.hadoop.hive.ql.io.RCFileInputFormat");
+ formatClassNames.put(INPUT_FORMAT_TEXT, CLASS_NAME_TEXT_INPUT_FORMAT);
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, CLASS_NAME_SEQUENCE_INPUT_FORMAT);
+ formatClassNames.put(INPUT_FORMAT_RC, CLASS_NAME_RC_INPUT_FORMAT);
return formatClassNames;
}
@@ -128,14 +142,21 @@
public static JobConf configureJobConf(Map<String, String> configuration) throws Exception {
JobConf conf = new JobConf();
String formatClassName = (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim());
+ String localShortCircuitSocketPath = (String) configuration.get(KEY_LOCAL_SOCKET_PATH);
if (formatClassName == null) {
formatClassName = ((String) configuration.get(KEY_INPUT_FORMAT)).trim();
}
- conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.set(KEY_HADOOP_FILESYSTEM_URI, ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set(KEY_HADOOP_FILESYSTEM_CLASS, CLASS_NAME_HDFS_FILESYSTEM);
conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
- conf.set("mapred.input.format.class", formatClassName);
+ conf.set(KEY_HADOOP_INPUT_DIR, ((String) configuration.get(KEY_PATH)).trim());
+ conf.set(KEY_HADOOP_INPUT_FORMAT, formatClassName);
+
+ // Enable local short circuit reads if user supplied the parameters
+ if (localShortCircuitSocketPath != null) {
+ conf.set(KEY_HADOOP_SHORT_CIRCUIT, "true");
+ conf.set(KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim());
+ }
return conf;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/HDFSSeekableLineReader.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/HDFSSeekableLineReader.java
new file mode 100644
index 0000000..c317140
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/HDFSSeekableLineReader.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * contains code adapted from Hadoop HDFS Project, which is licensed as follows:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.indexing.input;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A class that provides a line reader from an input stream which also allows performing seek operations
+ */
+public class HDFSSeekableLineReader {
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private FSDataInputStream reader;
+
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ private long currentFilePos = 0L;
+
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
+ public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (32k).
+ *
+ * @param in
+ * The input stream
+ * @throws IOException
+ */
+ public HDFSSeekableLineReader(FSDataInputStream in) throws IOException {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ *
+ * @param in
+ * The input stream
+ * @param bufferSize
+ * Size of the read buffer
+ * @throws IOException
+ */
+ public HDFSSeekableLineReader(FSDataInputStream in, int bufferSize) throws IOException {
+ this.reader = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ currentFilePos = in.getPos();
+ }
+
+ public HDFSSeekableLineReader() throws IOException {
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
+ *
+ * @param in
+ * input stream
+ * @param conf
+ * configuration
+ * @throws IOException
+ */
+ public HDFSSeekableLineReader(FSDataInputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text. A line
+ * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+ * line.
+ *
+ * @param str
+ * the object to store the given line (without newline)
+ * @param maxLineLength
+ * the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume
+ * the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR)
+ ++bytesConsumed; //account for CR from previous read
+ bufferLength = reader.read(buffer);
+ if (bufferLength <= 0)
+ break; // EOF
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0)
+ --readLength; //CR at the end of the buffer
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long) Integer.MAX_VALUE)
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ currentFilePos = reader.getPos() - bufferLength + bufferPosn;
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @param maxLineLength
+ * the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str
+ * the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException
+ * if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+ public void seek(long desired) throws IOException {
+ if (reader.getPos() <= desired || currentFilePos > desired) {
+ // desired position is ahead of stream or before the current position, seek to position
+ reader.seek(desired);
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = desired;
+ } else if (currentFilePos < desired) {
+ // desired position is in the buffer
+ int difference = (int) (desired - currentFilePos);
+ bufferPosn += difference;
+ currentFilePos = desired;
+ }
+ }
+
+ public FSDataInputStream getReader() {
+ return reader;
+ }
+
+ public void resetReader(FSDataInputStream reader) throws IOException {
+ this.reader = reader;
+ bufferLength = 0;
+ bufferPosn = 0;
+ currentFilePos = reader.getPos();
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupInputStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupInputStream.java
index 0102b28..6d2fb67 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupInputStream.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupInputStream.java
@@ -16,41 +16,46 @@
import java.io.IOException;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.asterix.metadata.external.ExternalFileIndexAccessor;
+public class TextFileLookupInputStream extends AbstractHDFSLookupInputStream {
-@SuppressWarnings("deprecation")
-public class TextFileLookupInputStream extends AbstractHDFSLookupInputStream{
+ private HDFSSeekableLineReader lineReader = new HDFSSeekableLineReader();
+ private Text value = new Text();
- private FSDataInputStream reader;
- public TextFileLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf) throws IOException{
+ public TextFileLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf) throws IOException {
super(filesIndexAccessor, conf);
}
+
@Override
- protected void openFile(String fileName) throws IOException {
- if (reader != null) {
- reader.close();
+ public void openFile(String FileName) throws IOException {
+ if (lineReader.getReader() != null) {
+ lineReader.getReader().close();
}
- reader = fs.open(new Path(fileName));
+ lineReader.resetReader(fs.open(new Path(FileName)));
}
-
+
@Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
+ public void close() {
+ if (lineReader.getReader() != null) {
+ try {
+ lineReader.getReader().close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- super.close();
}
-
+
@Override
protected boolean read(long recordOffset) {
try {
- reader.seek(recordOffset);
- pendingValue = reader.readLine();
+ lineReader.seek(recordOffset);
+ lineReader.readLine(value);
+ pendingValue = value.toString();
return true;
} catch (IOException e) {
// file was opened and then when trying to seek and read, an error occurred <- should we throw an exception ???->
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupReader.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupReader.java
index 3f84503..3326bae 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupReader.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextFileLookupReader.java
@@ -18,10 +18,10 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
@@ -33,14 +33,15 @@
private boolean skipFile = false;
private ExternalFile file = new ExternalFile(null, null, 0, null, null, 0L, ExternalFilePendingOp.PENDING_NO_OP);
private ExternalFileIndexAccessor filesIndexAccessor;
- private FSDataInputStream reader;
+ private HDFSSeekableLineReader lineReader;
+ private Text value = new Text();
public TextFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf) throws IOException {
- fs = FileSystem.get(conf);
+ this.fs = FileSystem.get(conf);
this.filesIndexAccessor = filesIndexAccessor;
+ this.lineReader = new HDFSSeekableLineReader();
}
- @SuppressWarnings("deprecation")
@Override
public String read(int fileNumber, long recordOffset) throws Exception {
if (fileNumber != this.fileNumber) {
@@ -67,24 +68,26 @@
} else if (skipFile) {
return null;
}
- reader.seek(recordOffset);
- return reader.readLine();
+ lineReader.seek(recordOffset);
+ lineReader.readLine(value);
+ return value.toString();
}
private void openFile(String FileName) throws IOException {
- if (reader != null) {
- reader.close();
+ if(lineReader.getReader() != null){
+ lineReader.getReader().close();
}
- reader = fs.open(new Path(FileName));
+ lineReader.resetReader(fs.open(new Path(FileName)));
}
public void close() {
- if (reader != null)
+ if (lineReader.getReader() != null){
try {
- reader.close();
+ lineReader.getReader().close();
} catch (IOException e) {
e.printStackTrace();
}
+ }
}
}