The Optimized Row Columnar (ORC) file format provides a highly efficient way
to store Hive data. It was designed to overcome the limitations of the other Hive
file formats. Using ORC files improves performance when Hive is reading,
writing, and processing data. There are hundreds of computing engine from Hive to Presto to read and write
ORC files. When it comes to reading or writing ORC files using core Java,
there is no enough help except the
official document. This article is for you if you are looking forward to writing your own code
to read or write ORC files.
In this article, we will create a simple ORC writer and reader to write ORC
files and to read from ORC files. Later the ORC writer and the reader will be
enhanced to support any common ORC types with some minor optimizations.
Requirements:
- Oracle JDK 8 or the latest version
- Apache Maven
-
IntelliJ IDEA
or
Eclipse
with Maven support
Step 01:
Create a new Maven project
in IntelliJ Idea with a group id:
com.javahelps.orc and an artifact
id: orc-demo.
Step 02:
Add the following dependencies to the
pom.xml.
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-tools</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
These dependencies are the minimal core dependencies required to write and read ORC files in core Java. However, most of the classes used in this article can be also found in some other platform-specific libraries. In this article, we will stick to the original dependencies. After adding these dependencies, the pom file should look like this:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.javahelps.orc</groupId>
<artifactId>orc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>14</source>
<target>14</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-tools</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
</dependencies>
</project>
After adding the dependency, never forget to load the maven changes in your
IDE.
Step 03:
Create a new package
com.javahelps.orc under the
src/main/java folder and create a
new Java class named:
OrcFileWriter.
Step 04:
Add the following write method
along with the main method to test
it.
package com.javahelps.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class OrcFileWriter {
public static void main(String[] args) throws IOException {
List<Map<String, Object>> data = new LinkedList<>();
data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));
write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);
System.out.println("Done");
}
public static void write(Configuration configuration, String path, String struct, List<Map<String, Object>> data) throws IOException {
// Create the schemas and extract metadata from the schema
TypeDescription schema = TypeDescription.fromString(struct);
// Create a row batch
VectorizedRowBatch batch = schema.createRowBatch();
// Get the column vector references
LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];
// Open a writer to write the data to an ORC fle
try (Writer writer = OrcFile.createWriter(new Path(path),
OrcFile.writerOptions(configuration)
.setSchema(schema))) {
for (Map<String, Object> row : data) {
// batch.size should be increased externally
int rowNum = batch.size++;
// Write each column to the associated column vector
orderIdColumnVector.vector[rowNum] = (Integer) row.get("order_id");
byte[] buffer = row.get("item_name").toString().getBytes(StandardCharsets.UTF_8);
itemNameColumnVector.setRef(rowNum, buffer, 0, buffer.length);
priceColumnVector.vector[rowNum] = (Float) row.get("price");
// If the buffer is full, write it to disk
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
// Check unwritten rows before closing
if (batch.size != 0) {
writer.addRowBatch(batch);
}
}
}
}
Important lines are commented in the code. As you can see, this class creates
an ORC file to store orders with the fields:
order_id,
item_name and
price. Type specific
ColumnVectors are required to write
column values to the file.
Since ORC files are column-oriented, data is stored using column vectors and read using column vectors. The ORC writer should know
the schema of the data to cast the
ColumnVector and write data
according to the type.
Apart from the technical details, running this class will create a new ORC
file named orders.orc with three rows.
Step 05: [Optional]
The OrcFileWriter can be made
generic using a utility method to cast the
ColumnVector based on the type and
to put the data in the vector. Create a new static method named
createColumnWriter in the same
class with the following code:
public static BiConsumer<Integer, Object> createColumnWriter(TypeDescription description, ColumnVector columnVector) {
String type = description.getCategory().getName();
BiConsumer<Integer, Object> consumer;
if ("tinyint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("smallint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("int".equals(type) || "date".equals(type)) {
// Date is represented as int epoch days
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("bigint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("boolean".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = (Boolean) val ? 1 : 0;
} else if ("float".equals(type)) {
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).floatValue();
} else if ("double".equals(type)) {
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).doubleValue();
} else if ("decimal".equals(type)) {
consumer = (row, val) -> ((DecimalColumnVector) columnVector).vector[row].set(HiveDecimal.create((BigDecimal) val));
} else if ("string".equals(type) || type.startsWith("varchar") || "char".equals(type)) {
consumer = (row, val) -> {
byte[] buffer = val.toString().getBytes(StandardCharsets.UTF_8);
((BytesColumnVector) columnVector).setRef(row, buffer, 0, buffer.length);
};
} else if ("timestamp".equals(type)) {
consumer = (row, val) -> ((TimestampColumnVector) columnVector).set(row, (Timestamp) val);
} else {
throw new RuntimeException("Unsupported type " + type);
}
return consumer;
}
This method can be used to automatically find the type of the of the column
based on the schema. Note that the return type is a BiConsumer which accepts the row number and the value as an Object and put the value in the ColumnVector. Above method is used in the following code to automatically find the ColumnVector.
package com.javahelps.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
public class OrcFileWriter {
public static void main(String[] args) throws IOException {
List<Map<String, Object>> data = new LinkedList<>();
data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));
write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);
System.out.println("Done");
}
public static void write(Configuration configuration, String path, String struct, List<Map<String, Object>> data) throws IOException {
// Create the schemas and extract metadata from the schema
TypeDescription schema = TypeDescription.fromString(struct);
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
// Create a row batch
VectorizedRowBatch batch = schema.createRowBatch();
// Get the column vector references
List<BiConsumer<Integer, Object>> consumers = new ArrayList<>(columnTypes.size());
for (int i = 0; i < columnTypes.size(); i++) {
TypeDescription type = columnTypes.get(i);
ColumnVector vector = batch.cols[i];
consumers.add(createColumnWriter(type, vector));
}
// Open a writer to write the data to an ORC fle
try (Writer writer = OrcFile.createWriter(new Path(path),
OrcFile.writerOptions(configuration)
.setSchema(schema))) {
for (Map<String, Object> row : data) {
// batch.size should be increased externally
int rowNum = batch.size++;
// Write each column to the associated column vector
for (int i = 0; i < fieldNames.size(); i++) {
consumers.get(i).accept(rowNum, row.get(fieldNames.get(i)));
}
// If the buffer is full, write it to disk
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
// Check unwritten rows before closing
if (batch.size != 0) {
writer.addRowBatch(batch);
}
}
}
public static BiConsumer<Integer, Object> createColumnWriter(TypeDescription description, ColumnVector columnVector) {
String type = description.getCategory().getName();
BiConsumer<Integer, Object> consumer;
if ("tinyint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("smallint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("int".equals(type) || "date".equals(type)) {
// Date is represented as int epoch days
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("bigint".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
} else if ("boolean".equals(type)) {
consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = (Boolean) val ? 1 : 0;
} else if ("float".equals(type)) {
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).floatValue();
} else if ("double".equals(type)) {
consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).doubleValue();
} else if ("decimal".equals(type)) {
consumer = (row, val) -> ((DecimalColumnVector) columnVector).vector[row].set(HiveDecimal.create((BigDecimal) val));
} else if ("string".equals(type) || type.startsWith("varchar") || "char".equals(type)) {
consumer = (row, val) -> {
byte[] buffer = val.toString().getBytes(StandardCharsets.UTF_8);
((BytesColumnVector) columnVector).setRef(row, buffer, 0, buffer.length);
};
} else if ("timestamp".equals(type)) {
consumer = (row, val) -> ((TimestampColumnVector) columnVector).set(row, (Timestamp) val);
} else {
throw new RuntimeException("Unsupported type " + type);
}
return consumer;
}
}
Step 06:
Now it is the time to read! Create a new class named OrcFileReader in the same
package with the following code.
package com.javahelps.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class OrcFileReader {
private static final int BATCH_SIZE = 2048;
public static void main(String[] args) throws IOException {
List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
for (Map<String, Object> row : rows) {
System.out.println(row);
}
}
public static List<Map<String, Object>> read(Configuration configuration, String path)
throws IOException {
// Create a list to collect rows
List<Map<String, Object>> rows = new LinkedList<>();
// Create an ORC reader using the Hadoop fileSystem and path
try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
// Extract the schema
TypeDescription schema = reader.getSchema();
try (RecordReader records = reader.rows(reader.options())) {
// Read rows in batch for better performance.
VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];
while (records.nextBatch(batch)) {
for (int rowNum = 0; rowNum < batch.size; rowNum++) {
// Read rows from the batch
Map<String, Object> map = new HashMap<>();
map.put("order_id", orderIdColumnVector.vector[rowNum]);
map.put("item_name", itemNameColumnVector.toString(rowNum));
map.put("price", priceColumnVector.vector[rowNum]);
rows.add(map);
}
}
}
}
return rows;
}
}
This code is similar to the writer in Step 04. Since we know the schema of orders.orc file, we blindly cast the vectors to LongColumnVector, BytesColumnVector and DoubleColumnVector in order.
Running this class will print the values we wrote to the ORC file in Step 04.
{price=800.0, item_name=Laptop, order_id=1}
{price=150.0, item_name=Mouse, order_id=2}
{price=250.0, item_name=Keyboard, order_id=3}
Step 07 [Optional]:
Similar to the enhancement we made to the OrcFileWriter in Step 05, the OrcFileReader can be improved to find the type of ColumnVector automatically. Add the following utility method to the OrcFileReader class.
public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
// Reference: https://orc.apache.org/docs/core-java.html
String type = description.getCategory().getName();
BiFunction<ColumnVector, Integer, Object> mapper;
if ("tinyint".equals(type)) {
mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
} else if ("smallint".equals(type)) {
mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
} else if ("int".equals(type) || "date".equals(type)) {
// Date is represented as int epoch days
mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
} else if ("bigint".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
} else if ("boolean".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
} else if ("float".equals(type)) {
mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
} else if ("double".equals(type)) {
mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
} else if ("decimal".equals(type)) {
mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
} else if ("string".equals(type) || type.startsWith("varchar")) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
} else if ("char".equals(type)) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
} else if ("timestamp".equals(type)) {
mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
} else {
throw new RuntimeException("Unsupported type " + type);
}
return mapper;
}
This method creates a BiFunction for each type. Given a ColumnVector and a row number, it will reurn the value stored in the given ColumnVector at the given index.
Modify the read method as shown below to use the createColumnReader method.
package com.javahelps.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
public class OrcFileReader {
private static final int BATCH_SIZE = 2048;
public static void main(String[] args) throws IOException {
List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
for (Map<String, Object> row : rows) {
System.out.println(row);
}
}
public static List<Map<String, Object>> read(Configuration configuration, String path)
throws IOException {
// Create a list to collect rows
List<Map<String, Object>> rows = new LinkedList<>();
// Create an ORC reader using the Hadoop fileSystem and path
try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
// Extract the schema and metadata from the reader
TypeDescription schema = reader.getSchema();
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
// Get the column vector references
int size = fieldNames.size();
BiFunction<ColumnVector, Integer, Object>[] mappers = new BiFunction[size];
for (int i = 0; i < size; i++) {
TypeDescription type = columnTypes.get(i);
mappers[i] = createColumnReader(type);
}
try (RecordReader records = reader.rows(reader.options())) {
// Read rows in batch for better performance.
VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
while (records.nextBatch(batch)) {
for (int row = 0; row < batch.size; row++) {
// Read rows from the batch
Map<String, Object> map = new HashMap<>();
for (int col = 0; col < batch.numCols; col++) {
ColumnVector columnVector = batch.cols[col];
if (columnVector.isNull[row]) {
map.put(fieldNames.get(col), null);
} else {
Object value = mappers[col].apply(columnVector, row);
map.put(fieldNames.get(col), value);
}
}
rows.add(map);
}
}
}
}
return rows;
}
public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
// Reference: https://orc.apache.org/docs/core-java.html
String type = description.getCategory().getName();
BiFunction<ColumnVector, Integer, Object> mapper;
if ("tinyint".equals(type)) {
mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
} else if ("smallint".equals(type)) {
mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
} else if ("int".equals(type) || "date".equals(type)) {
// Date is represented as int epoch days
mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
} else if ("bigint".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
} else if ("boolean".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
} else if ("float".equals(type)) {
mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
} else if ("double".equals(type)) {
mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
} else if ("decimal".equals(type)) {
mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
} else if ("string".equals(type) || type.startsWith("varchar")) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
} else if ("char".equals(type)) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
} else if ("timestamp".equals(type)) {
mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
} else {
throw new RuntimeException("Unsupported type " + type);
}
return mapper;
}
}
Above change will print the same result but without hard coding the ColumnVector type.
There can be further optimizations made to the OrcFileReader to make it faster. Big Data engines usually break an ORC file into small chunks and read them parallely in addition to several other optimizations. Adding concurrency is out of the scope of this article. Instead the following section explains how to skip reading unnecessary columns from the ORC file.
Filtering Columns
The ORC Reader#rows method takes a Reader.Options object. Using this object, we can control the columns to read. A boolean array is used to mark the columns that we want to read from an ORC file.
Create a new static method as shown below. This method constructs the boolean array to select columns and add the selected colum indices to a list for future references.
public static boolean[] createColumnsToRead(TypeDescription schema, Set<String> columns, List<Integer> indices) {
// Create an array of boolean
boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
for (int i = 0; i < fieldNames.size(); i++) {
if (columns.contains(fieldNames.get(i))) {
indices.add(i);
TypeDescription type = columnTypes.get(i);
for (int id = type.getId(); id <= type.getMaximumId(); id++) {
columnsToRead[id] = true;
}
}
}
return columnsToRead;
}
This method can be used in the read method of the OrcFileReader as shown below. Note how the list of selected indices are used to create the column mappers and to read data from the VectorizedRowBatch.
package com.javahelps.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.*;
import java.util.function.BiFunction;
public class OrcFileReader {
private static final int BATCH_SIZE = 2048;
public static void main(String[] args) throws IOException {
List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
for (Map<String, Object> row : rows) {
System.out.println(row);
}
}
public static List<Map<String, Object>> read(Configuration configuration, String path)
throws IOException {
// Create a list to collect rows
List<Map<String, Object>> rows = new LinkedList<>();
// Create an ORC reader using the Hadoop fileSystem and path
try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
// Extract the schema and metadata from the reader
TypeDescription schema = reader.getSchema();
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
// Select only order_id and price
List<Integer> selectedColumns = new ArrayList<>();
boolean[] columnsToRead = createColumnsToRead(schema, Set.of("order_id", "item_name", "price"), selectedColumns);
// Get the column vector references
int size = fieldNames.size();
BiFunction<ColumnVector, Integer, Object>[] mappers = new BiFunction[size];
for (int i : selectedColumns) {
TypeDescription type = columnTypes.get(i);
mappers[i] = createColumnReader(type);
}
// Pass the columnsToRead to the reader to read only the selected columns
try (RecordReader records = reader.rows(reader.options().include(columnsToRead))) {
// Read rows in batch for better performance.
VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
while (records.nextBatch(batch)) {
for (int row = 0; row < batch.size; row++) {
// Read rows from the batch
Map<String, Object> map = new HashMap<>(selectedColumns.size());
for (int col : selectedColumns) {
ColumnVector columnVector = batch.cols[col];
if (columnVector.isNull[row]) {
map.put(fieldNames.get(col), null);
} else {
Object value = mappers[col].apply(columnVector, row);
map.put(fieldNames.get(col), value);
}
}
rows.add(map);
}
}
}
}
return rows;
}
public static boolean[] createColumnsToRead(TypeDescription schema, Set<String> columns, List<Integer> indices) {
// Create an array of boolean
boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
for (int i = 0; i < fieldNames.size(); i++) {
if (columns.contains(fieldNames.get(i))) {
indices.add(i);
TypeDescription type = columnTypes.get(i);
for (int id = type.getId(); id <= type.getMaximumId(); id++) {
columnsToRead[id] = true;
}
}
}
return columnsToRead;
}
public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
// Reference: https://orc.apache.org/docs/core-java.html
String type = description.getCategory().getName();
BiFunction<ColumnVector, Integer, Object> mapper;
if ("tinyint".equals(type)) {
mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
} else if ("smallint".equals(type)) {
mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
} else if ("int".equals(type) || "date".equals(type)) {
// Date is represented as int epoch days
mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
} else if ("bigint".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
} else if ("boolean".equals(type)) {
mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
} else if ("float".equals(type)) {
mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
} else if ("double".equals(type)) {
mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
} else if ("decimal".equals(type)) {
mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
} else if ("string".equals(type) || type.startsWith("varchar")) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
} else if ("char".equals(type)) {
mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
} else if ("timestamp".equals(type)) {
mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
} else {
throw new RuntimeException("Unsupported type " + type);
}
return mapper;
}
}
Thats all I planned for this article. I hope this help you to read and write ORC files using core Java. If you have any questions or feedback please comment below. You can find the source code of this project at the GitHub repository.
EmoticonEmoticon