class: center, middle, inverse, title-slide # Fast Data Access in R and Python with ##
### Neal Richardson
@enpiar
### October 27, 2020
Slides:
enpiar.com/talks/odsc-west-2020/
--- layout: true <div class="my-footer"><span>https://enpiar.com/talks/odsc-west-2020/</span></div> --- # Ursa Labs .cols[ .fifty[ <img src="../rstudio-conf-2020/img/ursa.png" /> https://ursalabs.org ] .fifty[ * Build cross-language, open libraries for data science * Grow **Apache Arrow** ecosystem * Funding and employment for full-time developers * **Not-for-profit**, funded by multiple corporations ] ] --- # Overview * Apache Arrow: what it is, why it is compelling * Getting started with Arrow libraries in R and Python * Examples/code: reading and writing data, querying * End-to-end data service, Python + R + gRPC --- # While you're listening... * To follow along later, make sure you have R and Python installed, plus some NYC Taxi data https://github.com/r-lib/vroom/blob/master/inst/bench/download-data.sh#L4-L12 ``` sudo apt install p7zip-full mkdir ~/data/ && cd ~/data taxi_url=https://archive.org/download/nycTaxiTripData2013/trip_fare.7z wget -O ~/data/trip_fare.7z $taxi_url 7z x trip_fare.7z &> data.out # fix trailing space in header for every file ls *trip_fare*.csv | xargs -P 16 sed -i '1 s/, /,/g' # Move CSVs into own directory mkdir csv mv *.csv csv ``` --- class: center, middle, inverse # Apache Arrow --- ## Current generation data frame (tabular) computing is highly inefficient * High fraction of compute spent on **serialization** (converting between data formats) * Inefficient in-memory computing that **fails to fully utilize modern hardware capabilities** * Much developer time spent building data connectors and maintaining **glue code** ### *Our mission is to make scalable data processing faster, simpler, and more cost-efficient for the worldβs data scientists* --- # <img src="https://arrow.apache.org/img/arrow.png" height="100" /> * Started 2016, 1.0 release July 2020. Latest version: 2.0 * Shared foundation for data analysis, built on lessons of existing data frame libraries and databases * Designed to take advantage of modern hardware π https://arrow.apache.org/ --- # <img src="https://arrow.apache.org/img/arrow.png" height="100" /> .cols[ .fifty[ **Format** for how data is arranged in memory: columnar, language-independent <img src="../rstudio-conf-2020/img/simd.png" /> ] .fifty[ ] ] --- # <img src="https://arrow.apache.org/img/arrow.png" height="100" /> .cols[ .fifty[ **Format** for how data is arranged in memory: columnar, language-independent <img src="../rstudio-conf-2020/img/simd.png" /> ] .fifty[ **Implementations** or bindings in 12 languages <img src="../rstudio-conf-2020/img/language_logos.png" /> ... and more ] ] --- # Thriving open-source community <img src="../nyr-2020/img/arrow-contributors.png" /> --- # Lots of facets π« **Flight**: client-server framework for fast transport of data https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ π¦ **Plasma**: shared-memory object store https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ πΉ **Gandiva**: LLVM expression compiler http://arrow.apache.org/blog/2018/12/05/gandiva-donation/ and more --- background-image: url(../rstudio-conf-2020/img/community.png) # Lots of projects are using Arrow ??? * Lots of projects using Arrow as an efficient format to work with data and to transfer it * TODO specific projects to name-check (Tensorflow exchange? Athena federated query?) --- background-image: url(../rstudio-conf-2020/img/in-n-out.jpg) # Interoperability ??? βοΈ Interchange format: e.g. get data from Spark more efficiently. rather than write out to CSV, which is row based, and have to read from disk, parse strings, infer types, transpose to columns. --- # Example: Spark and R https://arrow.apache.org/blog/2019/01/25/r-spark-improvements/ <img src="../rstudio-conf-2020/img/spark.png" style="box-shadow: 5px 5px 5px, -1px -1px 1px"/> Up to 40x speedup when pulling data from Spark to R All you have to do is `library(arrow)` --- # Arrow is language independent π€ Improve collaboration with non-R users π£ Lets us access in R projects from other languages --- class: center, middle, inverse # Let's get started --- # Arrow R package installation ### CRAN release ```r install.packages("arrow") ``` https://arrow.apache.org/docs/r/ ### Conda ``` conda install -c conda-forge r-arrow ``` ### Nightly dev builds ```r arrow::install_arrow(nightly = TRUE) ``` https://ursalabs.org/arrow-r-nightly/ --- # Arrow R package installation Special considerations on Linux * Set environment variable `NOT_CRAN=true` to install fully-featured package. By default, compression libraries and AWS S3 support are not built. * Optional S3 support requires `libcurl` and `openssl` from the system, plus a modern-enough compiler. No other system dependencies are required. ``` # deb apt-get install -y libcurl4-openssl-dev libssl-dev # rpm yum install -y libcurl-devel openssl-devel ``` https://arrow.apache.org/docs/r/articles/install.html --- # TODAY ONLY Should apply a patch for a performance regression in 2.0.0: https://github.com/apache/arrow/pull/8536 ```shell git clone https://github.com/apache/arrow cd arrow git checkout apache-arrow-2.0.0 # URL broken to multiple lines to fit on slide base_url=https://patch-diff.githubusercontent.com/raw url=$base_url/apache/arrow/pull/8536.diff wget $url && git apply 8536.diff && rm 8536.diff export NOT_CRAN=true export MAKEFLAGS=-j`nproc` R CMD INSTALL r ``` --- # pyarrow installation ### Release version ``` # Wheel pip install pyarrow # conda conda install -c conda-forge pyarrow ``` https://arrow.apache.org/docs/python/ ### Nightly dev builds ``` # Wheel pip install \ --extra-index-url https://pypi.fury.io/arrow-nightlies/ \ --pre pyarrow # conda conda install -c arrow-nightlies pyarrow ``` --- # pyarrow installation Can also install `pyarrow` from R for use with `reticulate` ```r arrow::install_pyarrow() ``` Optional `nightly` argument here too https://arrow.apache.org/docs/r/reference/install_pyarrow.html --- class: center, middle, inverse # Reading and writing data --- # Objectives * Demonstrate functions for using Arrow to read data into R * Compare performance with other readers * Discuss tradeoffs/benefits of binary file formats (Parquet, Feather) and show how Arrow can reduce both read time and storage size * Show how you can get even more performance benefits by directly computing on Arrow memory --- # Data NYC taxi trip data, 2013: 12 CSVs ``` $ ls -lh total 19G -rwxrwxr-x 1 ubuntu ubuntu 1.6G Oct 26 19:53 trip_fare_1.csv -rwxrwxr-x 1 ubuntu ubuntu 1.6G Oct 26 19:54 trip_fare_10.csv -rwxrwxr-x 1 ubuntu ubuntu 1.6G Oct 26 19:54 trip_fare_11.csv -rwxrwxr-x 1 ubuntu ubuntu 1.5G Oct 26 19:55 trip_fare_12.csv -rwxrwxr-x 1 ubuntu ubuntu 1.5G Oct 26 19:55 trip_fare_2.csv -rwxrwxr-x 1 ubuntu ubuntu 1.7G Oct 26 19:55 trip_fare_3.csv -rwxrwxr-x 1 ubuntu ubuntu 1.7G Oct 26 19:56 trip_fare_4.csv -rwxrwxr-x 1 ubuntu ubuntu 1.7G Oct 26 19:56 trip_fare_5.csv -rwxrwxr-x 1 ubuntu ubuntu 1.6G Oct 26 19:57 trip_fare_6.csv -rwxrwxr-x 1 ubuntu ubuntu 1.5G Oct 26 19:57 trip_fare_7.csv -rwxrwxr-x 1 ubuntu ubuntu 1.4G Oct 26 19:58 trip_fare_8.csv -rwxrwxr-x 1 ubuntu ubuntu 1.5G Oct 26 19:58 trip_fare_9.csv ``` --- # Read a CSV Let's use one of the taxi data CSVs we downloaded ```r df <- read_csv_arrow("trip_fare_1.csv") ## user system elapsed ## 18.540 16.263 5.855 ``` Arguments follow naming conventions from `readr::read_csv()` https://arrow.apache.org/docs/r/reference/read_delim_arrow.html --- # Keep the data in Arrow ```r tab <- read_csv_arrow("trip_fare_1.csv", as_data_frame = FALSE) ## user system elapsed ## 14.148 2.672 0.569 ``` --- # dplyr on Arrow Table Only pay for what you use `dplyr` is not a required dependency of `arrow`, so it needs to be loaded explicitly ```r library(dplyr) tab %>% filter(payment_type %in% c("CSH", "CRD")) %>% select(tip_amount, total_amount, payment_type, vendor_id) %>% group_by(payment_type, vendor_id) %>% summarize(avg_tip = mean(100 * tip_amount / total_amount)) ``` --- # What is R's fastest CSV reader? -- * `base::read.csv()` -- * `readr::read_csv()` -- * `data.table::fread()` -- * `vroom::vroom()` -- * Something else? (`arrow::read_csv_arrow()`?) --- # It depends! * System specs * CPU * Memory * Operating system * File system * Libraries, compilers, etc. * Test data features * Size * Shape * Data types * Missing/sparseness * ... --- # How do you compare speed? Run **benchmarks** to systematically compare code head-to-head Like a scientific experiment: * Hold everything else constant so we can attribute speed differences to the code * Explicit and reproducible Several R packages can help with this: `microbenchmark`, `bench`, etc. -- Internal vs. external validity: "YMMV" even with the best benchmarks --- # vroom's benchmarks https://vroom.r-lib.org/articles/benchmarks.html * Tests several different kinds of data files and shapes, including "real" data * Runs a series of real example workflows * Honest: `vroom` isn't the fastest at everything * Reproducible: all scripted and documented ??? * vroom has great benchmarks vignette. Tests several different kinds of data files and shapes, also tests a series of real example workflows that you would do against a dataset. This is because one of the ways vroom gets such good read performance is that it lazily does the parsing and reading into R, which can be expensive. So "read" time might be tiny but processing time is higher than if all of the data were in memory in R. * One of the especially great things about the vroom benchmarks is that they are fully scripted and reproducible, and include instructions for setting up and running them. Makes it easy for Jim to update them with every vroom release. They're *also* really easy to extend by adding additional R scripts. --- # Extending vroom's benchmarks I added some variants: -- * `arrow::read_csv_arrow()` to read into an R data.frame, then use dplyr or data.table on that -- * `arrow::read_csv_arrow(as_data_frame = FALSE)` to return an Arrow Table and compute on that in Arrow (where possible) -- * `cudf$read_csv()`: Python GPU data frame library built on top of Arrow, called from R via `reticulate` <img src="../nyr-2020/img/rapids_logo.png" height="80"/> --- # Extending vroom's benchmarks Report: https://enpiar.com/talks/nyr-2020/benchmarks.html Source: [https://github.com/nealrichardson/vroom/tree/arrow-benchmarks](https://github.com/r-lib/vroom/compare/master...nealrichardson:arrow-benchmarks) * Environment: NVIDIA DGX workstation * CPU: 20-core Intel Xeon @ 2.20GHz, 256 GB RAM (2400 MHz), SSD * GPU: 4 NVIDIA V100 GPUs, 5,120 CUDA cores and 32 GB memory per GPU * Dev version of Arrow package for some post-1.0 release enhancements * `cudf` 0.15 (pre-release nightly build) * Release version of all other packages, via `conda` --- # Extending vroom's benchmarks Report: https://enpiar.com/talks/nyr-2020/benchmarks.html Source: [https://github.com/nealrichardson/vroom/tree/arrow-benchmarks](https://github.com/r-lib/vroom/compare/master...nealrichardson:arrow-benchmarks) Example workflow: ```r ({ library(readr); library(dplyr) }) x <- read_csv(file) print(x) a <- head(x) b <- tail(x) c <- sample_n(x, 100) d <- filter(x, payment_type == "UNK") e <- x %>% group_by(payment_type) %>% summarise(avg_tip = mean(tip_amount)) ``` --- class: less-padding <img src="../nyr-2020/img/taxi-single-1.png" width="864" /> --- class: less-padding <img src="../nyr-2020/img/taxi-single-arrow-1.png" width="864" /> --- # NYC taxi trip data, one file * Pure arrow (keeping the data in Arrow memory) is overall fastest, even though aggregation still happens in R * `cudf` is super fast at the grouped mean calculation: 100ms over 15 million rows! --- class: less-padding <img src="../nyr-2020/img/taxi-single-df-1.png" width="864" /> --- # NYC taxi trip data, one file * Pure arrow (keeping the data in Arrow memory) is overall fastest, even though aggregation still happens in R * `cudf` is super fast at the grouped mean calculation: 100ms over 15 million rows! * Arrow's CSV reader is significantly faster at producing a `data.frame`: can use it in combination with your favorite R computation packages --- # All numeric/character * Benchmarks with two variations of all numeric (random normal distribution) and all random strings * 1,000,000 x 25 (long) * 100,000 x 1,000 (wide) * Numeric: Arrow is faster than the rest (except `data.table`) on the "long" data but less outstanding on "wide" * String: Arrow-in-arrow is like vroom-with-ALTREP in terms of speed (10-20x faster than `data.table`, which is faster than the rest) due to efficiency of Arrow's string data type. --- # Binary data formats CSV is not the most efficient format for storing and analyzing data * Row-based: cheap to append to, more costly to select a column * Uncompressed (though you could gzip it) * Requires string parsing: inefficient, lossy * Cannot unambiguously represent data types --- # Binary data formats `arrow` lets you work with two columnar binary formats: * **Feather**, the Arrow format * **Parquet**, another standard for columnar data --- # Binary data formats Which is better? -- It depends! https://arrow.apache.org/faq/ * No serialization/deserialization with Arrow/Feather because it is the in-memory representation. * Parquet files are generally smaller on disk: support more forms of encoding, compression * Parquet files have statistics, which applications can use for efficient querying --- # Convert to binary formats ```r write_feather(tab, "~/data/jan.feather") write_parquet(tab, "~/data/jan.parquet") ``` Optional compression ```r write_feather(tab, "~/data/jan.feather.zstd", compression = "zstd") # Parquet uses snappy by default write_parquet(tab, "~/data/jan.parquet.zstd", compression = "zstd") ``` File sizes: ``` -rw-rw-r-- 1 ubuntu ubuntu 1.2G Oct 27 18:58 jan.feather -rw-rw-r-- 1 ubuntu ubuntu 555M Oct 27 18:58 jan.feather.zstd -rw-rw-r-- 1 ubuntu ubuntu 457M Oct 27 18:58 jan.parquet -rw-rw-r-- 1 ubuntu ubuntu 252M Oct 27 19:00 jan.parquet.zstd ``` --- # Convert to binary formats Can read CSV with types so we can skip some columns and dictionary-encode others ```r tab2 <- read_csv_arrow("~/data/csv/trip_fare_1.csv", col_names = names(tab), col_types = "--ftfdddddd", skip = 1, as_data_frame = FALSE ) ``` --- class: center, middle, inverse # Q&A --- class: center, middle, inverse # Multi-file datasets --- # Objectives * Show how to query larger-than-memory datasets split across multiple files * Discuss how partitioning and efficient binary formats can speed up your queries * Learn how to rewrite datasets so they can be queried faster * Explore alternatives in Python --- # Datasets * A single entity that may contain multiple files, possibly with different formats and not-identical schemas, or different types of data sources altogether * Query interface that lets you select columns and filter rows (aggregation coming in 2021) * Filters are evaluated efficiently; designed to work in a memory-constrained environment (don't have to be able to load everything in memory) * Can yield a stream of batches you can iterate over, or can collect those into an Arrow Table (which you can then move to a data frame in R, Pandas, etc.) --- # Datasets in R https://arrow.apache.org/docs/r/articles/dataset.html Point to a directory (possibly nested) and treat as a single entity ```r ds <- open_dataset("~/data/csv", format = "csv") ds ``` --- # Datasets in R Use `dplyr` to query it ```r ds %>% filter(vendor_id == "CMT") %>% select(tip_amount, total_amount, payment_type) %>% group_by(payment_type) %>% collect() %>% summarize( avg_tip = mean(100 * tip_amount / total_amount, na.rm = TRUE), n = n() ) ``` --- # Datasets in R Aggregation *not yet implemented in arrow*. So we can `select` and `filter` but need to `collect` before we `mutate` or `summarize` ```r ds %>% filter(vendor_id == "CMT") %>% select(tip_amount, total_amount, payment_type) %>% group_by(payment_type) %>% * collect() %>% summarize( avg_tip = mean(100 * tip_amount / total_amount, na.rm = TRUE), n = n() ) ``` --- # Partitioning and predicate pushdown Filtering happens as efficiently as possible * If filtering on a partition, we can skip entire files * If using Parquet, can exclude row groups within the file based on statistics --- # Partitioning Dataset divided into files nested in directories, and that directory structure is meaningful Directory names can include just values, in which case we need to specify the (virtual) column names when we load the dataset. Or, directory names can be `key=value`: "Hive-style" --- # Partitioning First, let's add a "month" partition by renaming the files ```r setwd("~/data/csv") for (d in paste0("../csv2/", 1:12)) { dir.create(d, recursive = TRUE) } new_paths <- sub( "trip_fare_([0-9]+)\\.csv", "../csv2/\\1/trip_fare.csv", dir() ) file.copy(dir(), new_paths) system("tree ../csv2") ``` --- # Dataset writing New feature in arrow 2.0.0 Illustrate partitioning by writing ```r setwd("~/data") ds <- open_dataset("csv2", format = "csv", partitioning = "month") ds %>% group_by(month, vendor_id) %>% write_dataset("parquet", format = "parquet") ds2 <- open_dataset("parquet") ``` --- # Dataset writing ``` $ tree parquet parquet βββ month=1 β βββ vendor_id=CMT β β βββ part-11.parquet β βββ vendor_id=VTS β βββ part-12.parquet βββ month=10 β βββ vendor_id=CMT β β βββ part-17.parquet β βββ vendor_id=VTS β βββ part-23.parquet ... ``` --- # Benefit of partitioning `vendor_id` is a partition key ```r ds2 %>% filter(vendor_id == "CMT") %>% select(tip_amount, total_amount, payment_type) %>% group_by(payment_type) %>% collect() %>% summarize( avg_tip = mean(100 * tip_amount / total_amount, na.rm = TRUE), n = n() ) ``` --- # Benefit of row group statistics `total_amount` is rarely above this amount so row group statistics will let us immediately exclude most batches in the Parquet files ```r ds2 %>% filter(vendor_id == "CMT", total_amount > 100) %>% select(tip_amount, total_amount, payment_type) %>% group_by(payment_type) %>% collect() %>% summarize( avg_tip = mean(100 * tip_amount / total_amount, na.rm = TRUE), n = n() ) ## # A tibble: 4 x 3 ## payment_type avg_tip n ## <chr> <dbl> <int> ## 1 CRD 15.2 58626 ## 2 CSH 0.758 8286 ## 3 DIS 0.835 369 ## 4 NOC 1.31 877 ## user system elapsed ## 9.341 0.931 0.647 ``` Same query against CSV dataset without partitioning is 5.613s, 9x slower --- # Now, in Python The same dataset feature is also available in `pyarrow`, though without the `dplyr` niceties Alternatively, we can use `dask` on Parquet datasets. This will let us parallelize the aggregation in addition to the filtering https://nbviewer.jupyter.org/gist/jorisvandenbossche/88fbae6c330da9a9f26394a95a82c708 --- class: center, middle, inverse # Q&A --- class: center, middle, inverse # Data over the network --- # Objectives * Explore how to use S3 with `arrow` to read and write data * Learn about the Flight RPC framework and see how it can help you move data across the network --- # S3 in arrow * Can read and write individual files and multi-file datasets directly * Supports various forms of authentication (key/secret, assume role, etc.) * Available in both R and Python, with caveats https://arrow.apache.org/docs/r/articles/fs.html --- # Installing with S3 support * In R, macOS and Windows binary packages have S3 support built in * On Linux, S3 support is opt-in and requires `libcurl` and `openssl` system dependencies, as well as a modern-enough compiler (gcc >= 4.9 or similar) --- # Accessing data For demonstration, we're going to use the public S3 bucket from the arrow dataset vignette We're demoing on an `m5.8xlarge` EC2 instance in the same region as this S3 bucket, so we should expect reasonable performance ```r ds <- open_dataset("s3://ursa-labs-taxi-data") ``` Can also use `s3_bucket()` if there are options/auth settings that can't (easily) be expressed in a URI ```r bucket <- s3_bucket("ursa-labs-taxi-data") ds <- open_dataset(bucket) ``` --- # Querying ```r ds %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% group_by(passenger_count) %>% collect() %>% summarize( tip_pct = median(100 * tip_amount / total_amount), n = n() ) ## user system elapsed ## 6.143 1.755 12.507 ``` Slower than if the data were local, but good performance given that we're querying against S3 --- # Browsing and accessing individual files ```r bucket$ls() df <- read_parquet(bucket$path("2019/03/data.parquet")) ## user system elapsed ## 3.708 1.255 4.746 # Compare to if the file is local: copy_files(bucket$path("2019/03"), ".") system.time(df <- read_parquet("data.parquet")) ## user system elapsed ## 2.999 0.912 1.337 ``` --- # Flight * A gRPC-based framework for implementing clients and servers that send and receive Arrow columnar data natively * Uses Protocol Buffers v3 for client protocol * Pluggable command execution layer, authentication * Low-level gRPC optimizations * Implementations in Python and Java; R can access via `reticulate` --- # Flight in R https://arrow.apache.org/docs/r/articles/flight.html Demo: send data between two machines in EC2 Server: Python Client: R (via reticulate) --- # Server Using the demo server that ships with the R package We can call this from R, but let's type a little Python ```python from R.arrow.demo_flight_server import DemoFlightServer server = DemoFlightServer(host="0.0.0.0", port=8089) server.serve() ``` --- # Client In R: ```r client <- flight_connect("172.31.12.197", "8089") # Send data b <- record_batch(df) system.time(push_data(client, b, "taxi")) ## user system elapsed ## 0.000 0.227 2.090 # Fetch data system.time(b2 <- flight_get(client, "taxi")) ## user system elapsed ## 0.540 1.245 2.094 ``` Data as Feather on disk: 341M Works out to 1.33 gigabits/second --- class: center, middle, inverse # Q&A