Read and Write ORC Files in Core Java

The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome the limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data. There are hundreds of computing engine from Hive to Presto to read and write ORC files. When it comes to reading or writing ORC files using core Java, there is no enough help except the official document. This article is for you if you are looking forward to writing your own code to read or write ORC files.
 
In this article, we will create a simple ORC writer and reader to write ORC files and to read from ORC files. Later the ORC writer and the reader will be enhanced to support any common ORC types with some minor optimizations.

Read and Write ORC Files in Core Java

Requirements:
Step 01:
Create a new Maven project in IntelliJ Idea with a group id: com.javahelps.orc and an artifact id: orc-demo.

Step 02:
Add the following dependencies to the pom.xml.

<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-core</artifactId>
    <version>1.6.3</version>
</dependency>

<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-tools</artifactId>
    <version>1.6.3</version>
</dependency>

<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.6.3</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.0</version>
</dependency>

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>
These dependencies are the minimal core dependencies required to write and read ORC files in core Java. However, most of the classes used in this article can be also found in some other platform-specific libraries. In this article, we will stick to the original dependencies. After adding these dependencies, the pom file should look like this:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.javahelps.orc</groupId>
    <artifactId>orc-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>14</source>
                    <target>14</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-core</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-tools</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-mapreduce</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>
    </dependencies>
</project>
After adding the dependency, never forget to load the maven changes in your IDE.

Step 03:
Create a new package com.javahelps.orc under the src/main/java folder and create a new Java class named: OrcFileWriter.

Step 04:
Add the following write method along with the main method to test it.

package com.javahelps.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class OrcFileWriter {
    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> data = new LinkedList<>();
        data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
        data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
        data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));

        write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);

        System.out.println("Done");
    }

    public static void write(Configuration configuration, String path, String struct, List<Map<String, Object>> data) throws IOException {
        // Create the schemas and extract metadata from the schema
        TypeDescription schema = TypeDescription.fromString(struct);

        // Create a row batch
        VectorizedRowBatch batch = schema.createRowBatch();

        // Get the column vector references
        LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
        BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
        DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];

        // Open a writer to write the data to an ORC fle
        try (Writer writer = OrcFile.createWriter(new Path(path),
                OrcFile.writerOptions(configuration)
                        .setSchema(schema))) {
            for (Map<String, Object> row : data) {
                // batch.size should be increased externally
                int rowNum = batch.size++;

                // Write each column to the associated column vector
                orderIdColumnVector.vector[rowNum] = (Integer) row.get("order_id");
                byte[] buffer = row.get("item_name").toString().getBytes(StandardCharsets.UTF_8);
                itemNameColumnVector.setRef(rowNum, buffer, 0, buffer.length);
                priceColumnVector.vector[rowNum] = (Float) row.get("price");

                // If the buffer is full, write it to disk
                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }

            // Check unwritten rows before closing
            if (batch.size != 0) {
                writer.addRowBatch(batch);
            }
        }
    }
}
Important lines are commented in the code. As you can see, this class creates an ORC file to store orders with the fields: order_id, item_name and price. Type specific ColumnVectors are required to write column values to the file.

Since ORC files are column-oriented, data is stored using column vectors and read using column vectors. The ORC writer should know the schema of the data to cast the ColumnVector and write data according to the type.

Apart from the technical details, running this class will create a new ORC file named orders.orc with three rows.


Step 05: [Optional]
The OrcFileWriter can be made generic using a utility method to cast the ColumnVector based on the type and to put the data in the vector. Create a new static method named createColumnWriter in the same class with the following code:

public static BiConsumer<Integer, Object> createColumnWriter(TypeDescription description, ColumnVector columnVector) {
    String type = description.getCategory().getName();
    BiConsumer<Integer, Object> consumer;
    if ("tinyint".equals(type)) {
        consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
    } else if ("smallint".equals(type)) {
        consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
    } else if ("int".equals(type) || "date".equals(type)) {
        // Date is represented as int epoch days
        consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
    } else if ("bigint".equals(type)) {
        consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
    } else if ("boolean".equals(type)) {
        consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = (Boolean) val ? 1 : 0;
    } else if ("float".equals(type)) {
        consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).floatValue();
    } else if ("double".equals(type)) {
        consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).doubleValue();
    } else if ("decimal".equals(type)) {
        consumer = (row, val) -> ((DecimalColumnVector) columnVector).vector[row].set(HiveDecimal.create((BigDecimal) val));
    } else if ("string".equals(type) || type.startsWith("varchar") || "char".equals(type)) {
        consumer = (row, val) -> {
            byte[] buffer = val.toString().getBytes(StandardCharsets.UTF_8);
            ((BytesColumnVector) columnVector).setRef(row, buffer, 0, buffer.length);
        };
    } else if ("timestamp".equals(type)) {
        consumer = (row, val) -> ((TimestampColumnVector) columnVector).set(row, (Timestamp) val);
    } else {
        throw new RuntimeException("Unsupported type " + type);
    }
    return consumer;
}
This method can be used to automatically find the type of the of the column based on the schema. Note that the return type is a BiConsumer which accepts the row number and the value as an Object and put the value in the ColumnVector. Above method is used in the following code to automatically find the ColumnVector.

package com.javahelps.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

public class OrcFileWriter {
    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> data = new LinkedList<>();
        data.add(Map.of("order_id", 1, "item_name", "Laptop", "price", 800.0f));
        data.add(Map.of("order_id", 2, "item_name", "Mouse", "price", 150.0f));
        data.add(Map.of("order_id", 3, "item_name", "Keyboard", "price", 250.0f));

        write(new Configuration(), "orders.orc", "struct<order_id:int,item_name:string,price:float>", data);

        System.out.println("Done");
    }

    public static void write(Configuration configuration, String path, String struct, List<Map<String, Object>> data) throws IOException {
        // Create the schemas and extract metadata from the schema
        TypeDescription schema = TypeDescription.fromString(struct);
        List<String> fieldNames = schema.getFieldNames();
        List<TypeDescription> columnTypes = schema.getChildren();

        // Create a row batch
        VectorizedRowBatch batch = schema.createRowBatch();

        // Get the column vector references
        List<BiConsumer<Integer, Object>> consumers = new ArrayList<>(columnTypes.size());
        for (int i = 0; i < columnTypes.size(); i++) {
            TypeDescription type = columnTypes.get(i);
            ColumnVector vector = batch.cols[i];
            consumers.add(createColumnWriter(type, vector));
        }

        // Open a writer to write the data to an ORC fle
        try (Writer writer = OrcFile.createWriter(new Path(path),
                OrcFile.writerOptions(configuration)
                        .setSchema(schema))) {
            for (Map<String, Object> row : data) {
                // batch.size should be increased externally
                int rowNum = batch.size++;

                // Write each column to the associated column vector
                for (int i = 0; i < fieldNames.size(); i++) {
                    consumers.get(i).accept(rowNum, row.get(fieldNames.get(i)));
                }

                // If the buffer is full, write it to disk
                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }

            // Check unwritten rows before closing
            if (batch.size != 0) {
                writer.addRowBatch(batch);
            }
        }
    }

    public static BiConsumer<Integer, Object> createColumnWriter(TypeDescription description, ColumnVector columnVector) {
        String type = description.getCategory().getName();
        BiConsumer<Integer, Object> consumer;
        if ("tinyint".equals(type)) {
            consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
        } else if ("smallint".equals(type)) {
            consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
        } else if ("int".equals(type) || "date".equals(type)) {
            // Date is represented as int epoch days
            consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
        } else if ("bigint".equals(type)) {
            consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = ((Number) val).longValue();
        } else if ("boolean".equals(type)) {
            consumer = (row, val) -> ((LongColumnVector) columnVector).vector[row] = (Boolean) val ? 1 : 0;
        } else if ("float".equals(type)) {
            consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).floatValue();
        } else if ("double".equals(type)) {
            consumer = (row, val) -> ((DoubleColumnVector) columnVector).vector[row] = ((Number) val).doubleValue();
        } else if ("decimal".equals(type)) {
            consumer = (row, val) -> ((DecimalColumnVector) columnVector).vector[row].set(HiveDecimal.create((BigDecimal) val));
        } else if ("string".equals(type) || type.startsWith("varchar") || "char".equals(type)) {
            consumer = (row, val) -> {
                byte[] buffer = val.toString().getBytes(StandardCharsets.UTF_8);
                ((BytesColumnVector) columnVector).setRef(row, buffer, 0, buffer.length);
            };
        } else if ("timestamp".equals(type)) {
            consumer = (row, val) -> ((TimestampColumnVector) columnVector).set(row, (Timestamp) val);
        } else {
            throw new RuntimeException("Unsupported type " + type);
        }
        return consumer;
    }
}


Step 06:
Now it is the time to read! Create a new class named OrcFileReader in the same package with the following code.

package com.javahelps.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class OrcFileReader {
    private static final int BATCH_SIZE = 2048;

    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
        for (Map<String, Object> row : rows) {
            System.out.println(row);
        }
    }

    public static List<Map<String, Object>> read(Configuration configuration, String path)
            throws IOException {
        // Create a list to collect rows
        List<Map<String, Object>> rows = new LinkedList<>();

        // Create an ORC reader using the Hadoop fileSystem and path
        try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
            // Extract the schema
            TypeDescription schema = reader.getSchema();

            try (RecordReader records = reader.rows(reader.options())) {
                // Read rows in batch for better performance.
                VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
                LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
                BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
                DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];

                while (records.nextBatch(batch)) {
                    for (int rowNum = 0; rowNum < batch.size; rowNum++) {
                        // Read rows from the batch
                        Map<String, Object> map = new HashMap<>();
                        map.put("order_id", orderIdColumnVector.vector[rowNum]);
                        map.put("item_name", itemNameColumnVector.toString(rowNum));
                        map.put("price", priceColumnVector.vector[rowNum]);
                        rows.add(map);
                    }
                }
            }
        }
        return rows;
    }
}

This code is similar to the writer in Step 04. Since we know the schema of orders.orc file, we blindly cast the vectors to LongColumnVector, BytesColumnVector and DoubleColumnVector in order.

Running this class will print the values we wrote to the ORC file in Step 04.
{price=800.0, item_name=Laptop, order_id=1}
{price=150.0, item_name=Mouse, order_id=2}
{price=250.0, item_name=Keyboard, order_id=3}

Step 07 [Optional]:
Similar to the enhancement we made to the OrcFileWriter in Step 05, the OrcFileReader can be improved to find the type of ColumnVector automatically. Add the following utility method to the OrcFileReader class.

public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
    // Reference: https://orc.apache.org/docs/core-java.html
    String type = description.getCategory().getName();
    BiFunction<ColumnVector, Integer, Object> mapper;
    if ("tinyint".equals(type)) {
        mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
    } else if ("smallint".equals(type)) {
        mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
    } else if ("int".equals(type) || "date".equals(type)) {
        // Date is represented as int epoch days
        mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
    } else if ("bigint".equals(type)) {
        mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
    } else if ("boolean".equals(type)) {
        mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
    } else if ("float".equals(type)) {
        mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
    } else if ("double".equals(type)) {
        mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
    } else if ("decimal".equals(type)) {
        mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
    } else if ("string".equals(type) || type.startsWith("varchar")) {
        mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
    } else if ("char".equals(type)) {
        mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
    } else if ("timestamp".equals(type)) {
        mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
    } else {
        throw new RuntimeException("Unsupported type " + type);
    }
    return mapper;
}
This method creates a BiFunction for each type. Given a ColumnVector and a row number, it will reurn the value stored in the given ColumnVector at the given index.

Modify the read method as shown below to use the createColumnReader method.

package com.javahelps.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

public class OrcFileReader {
    private static final int BATCH_SIZE = 2048;

    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
        for (Map<String, Object> row : rows) {
            System.out.println(row);
        }
    }

    public static List<Map<String, Object>> read(Configuration configuration, String path)
            throws IOException {
        // Create a list to collect rows
        List<Map<String, Object>> rows = new LinkedList<>();

        // Create an ORC reader using the Hadoop fileSystem and path
        try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
            // Extract the schema and metadata from the reader
            TypeDescription schema = reader.getSchema();
            List<String> fieldNames = schema.getFieldNames();
            List<TypeDescription> columnTypes = schema.getChildren();

            // Get the column vector references
            int size = fieldNames.size();
            BiFunction<ColumnVector, Integer, Object>[] mappers = new BiFunction[size];
            for (int i = 0; i < size; i++) {
                TypeDescription type = columnTypes.get(i);
                mappers[i] = createColumnReader(type);
            }

            try (RecordReader records = reader.rows(reader.options())) {
                // Read rows in batch for better performance.
                VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
                while (records.nextBatch(batch)) {
                    for (int row = 0; row < batch.size; row++) {
                        // Read rows from the batch
                        Map<String, Object> map = new HashMap<>();
                        for (int col = 0; col < batch.numCols; col++) {
                            ColumnVector columnVector = batch.cols[col];
                            if (columnVector.isNull[row]) {
                                map.put(fieldNames.get(col), null);
                            } else {
                                Object value = mappers[col].apply(columnVector, row);
                                map.put(fieldNames.get(col), value);
                            }
                        }
                        rows.add(map);
                    }
                }
            }
        }
        return rows;
    }

    public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
        // Reference: https://orc.apache.org/docs/core-java.html
        String type = description.getCategory().getName();
        BiFunction<ColumnVector, Integer, Object> mapper;
        if ("tinyint".equals(type)) {
            mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
        } else if ("smallint".equals(type)) {
            mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
        } else if ("int".equals(type) || "date".equals(type)) {
            // Date is represented as int epoch days
            mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
        } else if ("bigint".equals(type)) {
            mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
        } else if ("boolean".equals(type)) {
            mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
        } else if ("float".equals(type)) {
            mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
        } else if ("double".equals(type)) {
            mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
        } else if ("decimal".equals(type)) {
            mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
        } else if ("string".equals(type) || type.startsWith("varchar")) {
            mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
        } else if ("char".equals(type)) {
            mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
        } else if ("timestamp".equals(type)) {
            mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
        } else {
            throw new RuntimeException("Unsupported type " + type);
        }
        return mapper;
    }
}
Above change will print the same result but without hard coding the ColumnVector type.

There can be further optimizations made to the OrcFileReader to make it faster. Big Data engines usually break an ORC file into small chunks and read them parallely in addition to several other optimizations. Adding concurrency is out of the scope of this article. Instead the following section explains how to skip reading unnecessary columns from the ORC file.

Filtering Columns

The ORC Reader#rows method takes a Reader.Options object. Using this object, we can control the columns to read. A boolean array is used to mark the columns that we want to read from an ORC file.

Create a new static method as shown below. This method constructs the boolean array to select columns and add the selected colum indices to a list for future references.

public static boolean[] createColumnsToRead(TypeDescription schema, Set<String> columns, List<Integer> indices) {
    // Create an array of boolean
    boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
    List<String> fieldNames = schema.getFieldNames();
    List<TypeDescription> columnTypes = schema.getChildren();
    for (int i = 0; i < fieldNames.size(); i++) {
        if (columns.contains(fieldNames.get(i))) {
            indices.add(i);
            TypeDescription type = columnTypes.get(i);
            for (int id = type.getId(); id <= type.getMaximumId(); id++) {
                columnsToRead[id] = true;
            }
        }
    }
    return columnsToRead;
}
This method can be used in the read method of the OrcFileReader as shown below. Note how the list of selected indices are used to create the column mappers and to read data from the VectorizedRowBatch.

package com.javahelps.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.util.*;
import java.util.function.BiFunction;

public class OrcFileReader {
    private static final int BATCH_SIZE = 2048;

    public static void main(String[] args) throws IOException {
        List<Map<String, Object>> rows = read(new Configuration(), "orders.orc");
        for (Map<String, Object> row : rows) {
            System.out.println(row);
        }
    }

    public static List<Map<String, Object>> read(Configuration configuration, String path)
            throws IOException {
        // Create a list to collect rows
        List<Map<String, Object>> rows = new LinkedList<>();

        // Create an ORC reader using the Hadoop fileSystem and path
        try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
            // Extract the schema and metadata from the reader
            TypeDescription schema = reader.getSchema();
            List<String> fieldNames = schema.getFieldNames();
            List<TypeDescription> columnTypes = schema.getChildren();

            // Select only order_id and price
            List<Integer> selectedColumns = new ArrayList<>();
            boolean[] columnsToRead = createColumnsToRead(schema, Set.of("order_id", "item_name", "price"), selectedColumns);

            // Get the column vector references
            int size = fieldNames.size();
            BiFunction<ColumnVector, Integer, Object>[] mappers = new BiFunction[size];
            for (int i : selectedColumns) {
                TypeDescription type = columnTypes.get(i);
                mappers[i] = createColumnReader(type);
            }

            // Pass the columnsToRead to the reader to read only the selected columns
            try (RecordReader records = reader.rows(reader.options().include(columnsToRead))) {
                // Read rows in batch for better performance.
                VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
                while (records.nextBatch(batch)) {
                    for (int row = 0; row < batch.size; row++) {
                        // Read rows from the batch
                        Map<String, Object> map = new HashMap<>(selectedColumns.size());
                        for (int col : selectedColumns) {
                            ColumnVector columnVector = batch.cols[col];
                            if (columnVector.isNull[row]) {
                                map.put(fieldNames.get(col), null);
                            } else {
                                Object value = mappers[col].apply(columnVector, row);
                                map.put(fieldNames.get(col), value);
                            }
                        }
                        rows.add(map);
                    }
                }
            }
        }
        return rows;
    }

    public static boolean[] createColumnsToRead(TypeDescription schema, Set<String> columns, List<Integer> indices) {
        // Create an array of boolean
        boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
        List<String> fieldNames = schema.getFieldNames();
        List<TypeDescription> columnTypes = schema.getChildren();
        for (int i = 0; i < fieldNames.size(); i++) {
            if (columns.contains(fieldNames.get(i))) {
                indices.add(i);
                TypeDescription type = columnTypes.get(i);
                for (int id = type.getId(); id <= type.getMaximumId(); id++) {
                    columnsToRead[id] = true;
                }
            }
        }
        return columnsToRead;
    }

    public static BiFunction<ColumnVector, Integer, Object> createColumnReader(TypeDescription description) {
        // Reference: https://orc.apache.org/docs/core-java.html
        String type = description.getCategory().getName();
        BiFunction<ColumnVector, Integer, Object> mapper;
        if ("tinyint".equals(type)) {
            mapper = (columnVector, row) -> (byte) ((LongColumnVector) columnVector).vector[row];
        } else if ("smallint".equals(type)) {
            mapper = (columnVector, row) -> (short) ((LongColumnVector) columnVector).vector[row];
        } else if ("int".equals(type) || "date".equals(type)) {
            // Date is represented as int epoch days
            mapper = (columnVector, row) -> (int) ((LongColumnVector) columnVector).vector[row];
        } else if ("bigint".equals(type)) {
            mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row];
        } else if ("boolean".equals(type)) {
            mapper = (columnVector, row) -> ((LongColumnVector) columnVector).vector[row] == 1;
        } else if ("float".equals(type)) {
            mapper = (columnVector, row) -> (float) ((DoubleColumnVector) columnVector).vector[row];
        } else if ("double".equals(type)) {
            mapper = (columnVector, row) -> ((DoubleColumnVector) columnVector).vector[row];
        } else if ("decimal".equals(type)) {
            mapper = (columnVector, row) -> ((DecimalColumnVector) columnVector).vector[row].getHiveDecimal().bigDecimalValue();
        } else if ("string".equals(type) || type.startsWith("varchar")) {
            mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row);
        } else if ("char".equals(type)) {
            mapper = (columnVector, row) -> ((BytesColumnVector) columnVector).toString(row).charAt(0);
        } else if ("timestamp".equals(type)) {
            mapper = (columnVector, row) -> ((TimestampColumnVector) columnVector).getTimestampAsLong(row);
        } else {
            throw new RuntimeException("Unsupported type " + type);
        }
        return mapper;
    }
}
Thats all I planned for this article. I hope this help you to read and write ORC files using core Java. If you have any questions or feedback please comment below. You can find the source code of this project at the GitHub repository.




Previous
Next Post »

Contact Form

Name

Email *

Message *