Arrow IPC

IPC options

struct arrow::ipc::IpcReadOptions

Options for reading Arrow IPC messages.

Public Members

int max_recursion_depth = kMaxNestingDepth

The maximum permitted schema nesting depth.

MemoryPool *memory_pool = default_memory_pool()

The memory pool to use for allocations made during IPC reading.

While Arrow IPC is predominantly zero-copy, it may have to allocate memory in some cases (for example if compression is enabled).

std::vector<int> included_fields

EXPERIMENTAL: Top-level schema fields to include when deserializing RecordBatch.

If empty (the default), return all deserialized fields. If non-empty, the values are the indices of fields in the top-level schema.

bool use_threads = true

Use global CPU thread pool to parallelize any computational tasks like decompression.

struct arrow::ipc::IpcWriteOptions

Options for writing Arrow IPC messages.

Public Members

bool allow_64bit = false

If true, allow field lengths that don’t fit in a signed 32-bit int.

Some implementations may not be able to parse streams created with this option.

int max_recursion_depth = kMaxNestingDepth

The maximum permitted schema nesting depth.

int32_t alignment = 8

Write padding after memory buffers up to this multiple of bytes.

bool write_legacy_ipc_format = false

Write the pre-0.15.0 IPC message format.

This legacy format consists of a 4-byte prefix instead of 8-byte.

MemoryPool *memory_pool = default_memory_pool()

The memory pool to use for allocations made during IPC writing.

While Arrow IPC is predominantly zero-copy, it may have to allocate memory in some cases (for example if compression is enabled).

std::shared_ptr<util::Codec> codec

Compression codec to use for record batch body buffers.

May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.

bool use_threads = true

Use global CPU thread pool to parallelize any computational tasks like compression.

bool emit_dictionary_deltas = false

Whether to emit dictionary deltas.

If false, a changed dictionary for a given field will emit a full dictionary replacement. If true, a changed dictionary will be compared against the previous version. If possible, a dictionary delta will be omitted, otherwise a full dictionary replacement.

Default is false to maximize stream compatibility.

Also, note that if a changed dictionary is a nested dictionary, then a delta is never emitted, for compatibility with the read path.

MetadataVersion metadata_version = MetadataVersion::V5

Format version to use for IPC messages and their metadata.

Presently using V5 version (readable by 1.0.0 and later). V4 is also available (readable by 0.8.0 and later).

Reading IPC streams and files

Blocking API

Use either of these two classes, depending on which IPC format you want to read. The file format requires a random-access file, while the stream format only requires a sequential input stream.

class arrow::ipc::RecordBatchStreamReader : public arrow::RecordBatchReader

Synchronous batch stream reader that reads from io::InputStream.

This class reads the schema (plus any dictionaries) as the first messages in the stream, followed by record batches. For more granular zero-copy reads see the ReadRecordBatch functions

Public Functions

ReadStats stats() const = 0

Return current read statistics.

Public Static Functions

Result<std::shared_ptr<RecordBatchStreamReader>> Open(std::unique_ptr<MessageReader> message_reader, const IpcReadOptions &options = IpcReadOptions::Defaults())

Create batch reader from generic MessageReader.

This will take ownership of the given MessageReader.

Return

the created batch reader

Parameters
  • [in] message_reader: a MessageReader implementation

  • [in] options: any IPC reading options (optional)

Result<std::shared_ptr<RecordBatchStreamReader>> Open(io::InputStream *stream, const IpcReadOptions &options = IpcReadOptions::Defaults())

Record batch stream reader from InputStream.

Return

the created batch reader

Parameters
  • [in] stream: an input stream instance. Must stay alive throughout lifetime of stream reader

  • [in] options: any IPC reading options (optional)

Result<std::shared_ptr<RecordBatchStreamReader>> Open(const std::shared_ptr<io::InputStream> &stream, const IpcReadOptions &options = IpcReadOptions::Defaults())

Open stream and retain ownership of stream object.

Return

the created batch reader

Parameters
  • [in] stream: the input stream

  • [in] options: any IPC reading options (optional)

class arrow::ipc::RecordBatchFileReader

Reads the record batch file format.

Public Functions

std::shared_ptr<Schema> schema() const = 0

The schema read from the file.

int num_record_batches() const = 0

Returns the number of record batches in the file.

MetadataVersion version() const = 0

Return the metadata version from the file metadata.

std::shared_ptr<const KeyValueMetadata> metadata() const = 0

Return the contents of the custom_metadata field from the file’s Footer.

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0

Read a particular record batch from the file.

Does not copy memory if the input source supports zero-copy.

Return

the read batch

Parameters
  • [in] i: the index of the record batch to return

ReadStats stats() const = 0

Return current read statistics.

Public Static Functions

Result<std::shared_ptr<RecordBatchFileReader>> Open(io::RandomAccessFile *file, const IpcReadOptions &options = IpcReadOptions::Defaults())

Open a RecordBatchFileReader.

Open a file-like object that is assumed to be self-contained; i.e., the end of the file interface is the end of the Arrow file. Note that there can be any amount of data preceding the Arrow-formatted data, because we need only locate the end of the Arrow file stream to discover the metadata and then proceed to read the data into memory.

Result<std::shared_ptr<RecordBatchFileReader>> Open(io::RandomAccessFile *file, int64_t footer_offset, const IpcReadOptions &options = IpcReadOptions::Defaults())

Open a RecordBatchFileReader If the file is embedded within some larger file or memory region, you can pass the absolute memory offset to the end of the file (which contains the metadata footer).

The metadata must have been written with memory offsets relative to the start of the containing file

Return

the returned reader

Parameters
  • [in] file: the data source

  • [in] footer_offset: the position of the end of the Arrow file

  • [in] options: options for IPC reading

Result<std::shared_ptr<RecordBatchFileReader>> Open(const std::shared_ptr<io::RandomAccessFile> &file, const IpcReadOptions &options = IpcReadOptions::Defaults())

Version of Open that retains ownership of file.

Return

the returned reader

Parameters
  • [in] file: the data source

  • [in] options: options for IPC reading

Result<std::shared_ptr<RecordBatchFileReader>> Open(const std::shared_ptr<io::RandomAccessFile> &file, int64_t footer_offset, const IpcReadOptions &options = IpcReadOptions::Defaults())

Version of Open that retains ownership of file.

Return

the returned reader

Parameters
  • [in] file: the data source

  • [in] footer_offset: the position of the end of the Arrow file

  • [in] options: options for IPC reading

Event-driven API

To read an IPC stream in event-driven fashion, you must implement a Listener subclass that you will pass to StreamDecoder.

class arrow::ipc::Listener

A general listener class to receive events.

You must implement callback methods for interested events.

This API is EXPERIMENTAL.

Since

0.17.0

Subclassed by arrow::ipc::CollectListener

Public Functions

Status OnEOS()

Called when end-of-stream is received.

The default implementation just returns arrow::Status::OK().

Return

Status

See

StreamDecoder

Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch)

Called when a record batch is decoded.

The default implementation just returns arrow::Status::NotImplemented().

Return

Status

See

StreamDecoder

Parameters
  • [in] record_batch: a record batch decoded

Status OnSchemaDecoded(std::shared_ptr<Schema> schema)

Called when a schema is decoded.

The default implementation just returns arrow::Status::OK().

Return

Status

See

StreamDecoder

Parameters
  • [in] schema: a schema decoded

class arrow::ipc::StreamDecoder

Push style stream decoder that receives data from user.

This class decodes the Apache Arrow IPC streaming format data.

This API is EXPERIMENTAL.

See

https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format

Since

0.17.0

Public Functions

StreamDecoder(std::shared_ptr<Listener> listener, IpcReadOptions options = IpcReadOptions::Defaults())

Construct a stream decoder.

Parameters

Status Consume(const uint8_t *data, int64_t size)

Feed data to the decoder as a raw data.

If the decoder can read one or more record batches by the data, the decoder calls listener->OnRecordBatchDecoded() with a decoded record batch multiple times.

Return

Status

Parameters
  • [in] data: a raw data to be processed. This data isn’t copied. The passed memory must be kept alive through record batch processing.

  • [in] size: raw data size.

Status Consume(std::shared_ptr<Buffer> buffer)

Feed data to the decoder as a Buffer.

If the decoder can read one or more record batches by the Buffer, the decoder calls listener->RecordBatchReceived() with a decoded record batch multiple times.

Return

Status

Parameters
  • [in] buffer: a Buffer to be processed.

std::shared_ptr<Schema> schema() const

Return

the shared schema of the record batches in the stream

int64_t next_required_size() const

Return the number of bytes needed to advance the state of the decoder.

This method is provided for users who want to optimize performance. Normal users don’t need to use this method.

Here is an example usage for normal users:

decoder.Consume(buffer1);
decoder.Consume(buffer2);
decoder.Consume(buffer3);

Decoder has internal buffer. If consumed data isn’t enough to advance the state of the decoder, consumed data is buffered to the internal buffer. It causes performance overhead.

If you pass next_required_size() size data to each Consume() call, the decoder doesn’t use its internal buffer. It improves performance.

Here is an example usage to avoid using internal buffer:

buffer1 = get_data(decoder.next_required_size());
decoder.Consume(buffer1);
buffer2 = get_data(decoder.next_required_size());
decoder.Consume(buffer2);

Users can use this method to avoid creating small chunks. Record batch data must be contiguous data. If users pass small chunks to the decoder, the decoder needs concatenate small chunks internally. It causes performance overhead.

Here is an example usage to reduce small chunks:

buffer = AllocateResizableBuffer();
while ((small_chunk = get_data(&small_chunk_size))) {
  auto current_buffer_size = buffer->size();
  buffer->Resize(current_buffer_size + small_chunk_size);
  memcpy(buffer->mutable_data() + current_buffer_size,
         small_chunk,
         small_chunk_size);
  if (buffer->size() < decoder.next_required_size()) {
    continue;
  }
  std::shared_ptr<arrow::Buffer> chunk(buffer.release());
  decoder.Consume(chunk);
  buffer = AllocateResizableBuffer();
}
if (buffer->size() > 0) {
  std::shared_ptr<arrow::Buffer> chunk(buffer.release());
  decoder.Consume(chunk);
}

Return

the number of bytes needed to advance the state of the decoder

ReadStats stats() const

Return current read statistics.

Statistics

struct arrow::ipc::ReadStats

Public Members

int64_t num_messages = 0

Number of IPC messages read.

int64_t num_record_batches = 0

Number of record batches read.

int64_t num_dictionary_batches = 0

Number of dictionary batches read.

Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries

int64_t num_dictionary_deltas = 0

Number of dictionary deltas read.

int64_t num_replaced_dictionaries = 0

Number of replaced dictionaries (i.e.

where a dictionary batch replaces an existing dictionary with an unrelated new dictionary).

Writing IPC streams and files

Blocking API

The IPC stream format is only optionally terminated, whereas the IPC file format must include a terminating footer. Thus a writer of the IPC file format must be explicitly finalized with Close() or the resulting file will be corrupt.

Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter(io::OutputStream *sink, const std::shared_ptr<Schema> &schema, const IpcWriteOptions &options = IpcWriteOptions::Defaults())

Create a new IPC stream writer from stream sink and schema.

User is responsible for closing the actual OutputStream.

Return

Result<std::shared_ptr<RecordBatchWriter>>

Parameters
  • [in] sink: output stream to write to

  • [in] schema: the schema of the record batches to be written

  • [in] options: options for serialization

Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter(std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema> &schema, const IpcWriteOptions &options = IpcWriteOptions::Defaults())

Create a new IPC stream writer from stream sink and schema.

User is responsible for closing the actual OutputStream.

Return

Result<std::shared_ptr<RecordBatchWriter>>

Parameters
  • [in] sink: output stream to write to

  • [in] schema: the schema of the record batches to be written

  • [in] options: options for serialization

Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter(io::OutputStream *sink, const std::shared_ptr<Schema> &schema, const IpcWriteOptions &options = IpcWriteOptions::Defaults(), const std::shared_ptr<const KeyValueMetadata> &metadata = NULLPTR)

Create a new IPC file writer from stream sink and schema.

Return

Result<std::shared_ptr<RecordBatchWriter>>

Parameters
  • [in] sink: output stream to write to

  • [in] schema: the schema of the record batches to be written

  • [in] options: options for serialization, optional

  • [in] metadata: custom metadata for File Footer, optional

Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter(std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema> &schema, const IpcWriteOptions &options = IpcWriteOptions::Defaults(), const std::shared_ptr<const KeyValueMetadata> &metadata = NULLPTR)

Create a new IPC file writer from stream sink and schema.

Return

Result<std::shared_ptr<RecordBatchWriter>>

Parameters
  • [in] sink: output stream to write to

  • [in] schema: the schema of the record batches to be written

  • [in] options: options for serialization, optional

  • [in] metadata: custom metadata for File Footer, optional

class arrow::ipc::RecordBatchWriter

Abstract interface for writing a stream of record batches.

Subclassed by arrow::flight::MetadataRecordBatchWriter

Public Functions

Status WriteRecordBatch(const RecordBatch &batch) = 0

Write a record batch to the stream.

Return

Status

Parameters
  • [in] batch: the record batch to write to the stream

Status WriteTable(const Table &table)

Write possibly-chunked table by creating sequence of record batches.

Return

Status

Parameters
  • [in] table: table to write

Status WriteTable(const Table &table, int64_t max_chunksize)

Write Table with a particular chunksize.

Return

Status

Parameters
  • [in] table: table to write

  • [in] max_chunksize: maximum chunk size for table chunks

Status Close() = 0

Perform any logic necessary to finish the stream.

Return

Status

WriteStats stats() const = 0

Return current write statistics.

Statistics

struct arrow::ipc::WriteStats

Public Members

int64_t num_messages = 0

Number of IPC messages written.

int64_t num_record_batches = 0

Number of record batches written.

int64_t num_dictionary_batches = 0

Number of dictionary batches written.

Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries

int64_t num_dictionary_deltas = 0

Number of dictionary deltas written.

int64_t num_replaced_dictionaries = 0

Number of replaced dictionaries (i.e.

where a dictionary batch replaces an existing dictionary with an unrelated new dictionary).