Using Arrow, we can point at a directory of files and treat them as a single dataset, and we can query them with dplyr syntax.

library(arrow)
library(dplyr)

(dplyr is optional for arrow, so we need to load both packages.)

I’ve got 10+ years of the NYC Taxi trip data in Parquet files, split into directories for years and months. Total of 125 files.

files <- dir(recursive = TRUE)
head(files)
## [1] "nyc-taxi/2009/01/data.parquet" "nyc-taxi/2009/02/data.parquet"
## [3] "nyc-taxi/2009/03/data.parquet" "nyc-taxi/2009/04/data.parquet"
## [5] "nyc-taxi/2009/05/data.parquet" "nyc-taxi/2009/06/data.parquet"
length(files)
## [1] 125

This is somewhere around 2 billion rows, which even in compressed Parquet files is 37 gigs.

system("du -h | tail", intern = TRUE) %>% cat(sep = "\n")
## 150M ./nyc-taxi/2016/10
## 150M ./nyc-taxi/2016/07
## 143M ./nyc-taxi/2016/09
## 145M ./nyc-taxi/2016/08
## 255M ./nyc-taxi/2016/01
## 255M ./nyc-taxi/2016/06
## 145M ./nyc-taxi/2016/12
## 2.4G ./nyc-taxi/2016
##  37G ./nyc-taxi
##  37G .

open_dataset() takes a directory argument, and we can also provide the file
path segments as a source of partition information so that we can query more efficiently.

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
ds
## Dataset
## vendor_id: string
## pickup_at: timestamp[us]
## dropoff_at: timestamp[us]
## passenger_count: int8
## trip_distance: float
## pickup_longitude: float
## pickup_latitude: float
## rate_code_id: string
## store_and_fwd_flag: string
## dropoff_longitude: float
## dropoff_latitude: float
## payment_type: string
## fare_amount: float
## extra: float
## mta_tax: float
## tip_amount: float
## tolls_amount: float
## total_amount: float
## improvement_surcharge: float
## pickup_location_id: int32
## dropoff_location_id: int32
## congestion_surcharge: float
## year: int32
## month: int32
##
## See $metadata for additional Schema metadata
system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  group_by(passenger_count) %>%
  collect() %>%
  summarize(
    tip_pct = median(100 * tip_amount / total_amount),
    n = n()
  ) %>%
  print())
## # A tibble: 10 x 3
##    passenger_count tip_pct      n
##              <int>   <dbl>  <int>
##  1               0    9.84    380
##  2               1   16.7  143087
##  3               2   16.6   34418
##  4               3   14.4    8922
##  5               4   11.4    4771
##  6               5   16.7    5806
##  7               6   16.7    3338
##  8               7   16.7      11
##  9               8   16.7      32
## 10               9   16.7      42
##    user  system elapsed
##  26.735   1.159   4.076

All of the data window selection (select, filter) happens in Arrow, with work pushed down to the individual files, so we don’t have to pull everything into memory in order to query. Data is only read in when you collect().

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  group_by(passenger_count)
## Dataset (query)
## tip_amount: float
## total_amount: float
## passenger_count: int8
##
## * Filter: ((total_amount > 100:double) and (year == 2015:double))
## * Grouped by passenger_count
## See $.data for the source Arrow object

Back to slides