class: title-slide, center, middle, remark-slide-content, inverse, title-slide, hljs-github # Using {targets}, {arrow}, Docker, Postgres and the Command Line for Medium Data ### Jared P. Lander ### Chief Data Scientist <img src="data:image/png;base64,#/home/jared/consulting/talks/images/Lander_logo.png" width="40%" /> ??? - Working on a project - Many moving parts - The data pipeline - Medium data considerations
<!-- Start main content --> --- class: middle, center # Medium Data ??? - Harder too handle than people think --- class: middle - Big enough that it doesn't fit in memory - Small enough that you don't need Google-scale tools ??? - Big enough to be a pain - Not big enough for big tools - Discuss tools --- class: middle, center # For Example ??? - Our example --- background-image: url("data:image/png;base64,#/home/jared/consulting/talks/images/cash_cab_2.jpg") background-size: cover ??? - Taxis - Yes, that's Cash Cab - Yes, I was on it --- class: middle, center # [NYC Taxi Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) ??? - Public data about taxi trips - https://registry.opendata.aws/nyc-tlc-trip-records-pds/ --- class: middle, center # Goal ??? - Our goal for today --- class: middle - Pull data from S3 ??? - Pull data from S3 - If it changed - Egress -- - Keep data only for certain areas ??? - Keep data only for certain geographic areas -- - Keep rides on days where total income was over threshold ??? - Keep rides on days where total income was over $6 Million -- - Load into database ??? - Load into database - Some of this is just to illustrate difficult tasks --- class: section # Getting the Data ??? - Need to get the data --- class: middle ```r aws.s3::save_object( object='yellow_tripdata_2015-01.csv', bucket='nyc-tlc/trip+data', file='/data/public_data/taxis/yellow_tripdata_2015-01.csv' ) ``` ??? - Gets the file 'yellow_tripdata_2015-01.csv' - From bucket 'nyc-tlc/trip+data' - Saves it in '/data/public_data/taxis/yellow_tripdata_2015-01.csv' --- class: middle ```r set.seed(1826) some_data <- data.table::fread( '/data/public_data/taxis/yellow_tripdata_2015-01.csv', select=c('tpep_pickup_datetime', 'passenger_count', 'pickup_longitude', 'pickup_latitude') ) |> dplyr::slice_sample(n=1000) |> sf::st_as_sf(coords=c('pickup_longitude', 'pickup_latitude'), remove=FALSE) ``` ??? - Sample data - Convert to sf --- class: middle ```r library(leaflet) library(leafgl) leaflet(elementId='SampleDataMap') |> addTiles() |> addGlPoints(data=some_data) ```
??? - What's going on near Africa? - Need to remove points that are outside NYC - addGlPoints() can plot many more points because it uses WebGL --- class: section # Geofilter the Data ??? - Have to remove the bad points --- class: middle, center # Strategies ??? - How do we go about this? --- class: middle - Load data into PostGIS ❌ ??? - Load into PostGIS - That's our end goal, not ready yet -- - Read in all the data, convert to `sf` object, filter out the rows ❌ ??? - Can't be sure all the data will fit into memory -- - Split into multiple files, then read and filter each file ✅ ??? - Conserves memory - Can parallelize the processing --- class: middle, center # Split the Files ??? - First we split the files --- class: middle ```r split_file <- function(file, dest_folder, lines=40000, suffix_length=10) { # remove existing files file.remove(dir(dest_folder, full.names=TRUE)) # get header row header_row <- data.table::fread(input=file, header=FALSE, nrows=1) data.table::fwrite(header_row, file.path(dest_folder, 'header.csv'), col.names=FALSE) # run the bash program "split" * res <- processx::run( 'split', args=c( file, "-l", lines, "--additional-suffix", ".csv", "--numeric-suffixes=1", "-a", suffix_length, dest_folder ) ) # return a list of files that were generated generated_files <- list.files(dest_folder, pattern='\\.csv$', full.names=TRUE) # remove header file from list generated_files <- generated_files[!grepl(pattern='header\\.csv', x=generated_files)] return(generated_files) } ``` ??? - Deletes existing files - saves column names for later - Uses split in bash - could remove header row if we wanted - Returns file names so we can use them later - First of many uses of command line --- class: middle ```r raw_files <- split_file( '/data/public_data/taxis/yellow_tripdata_2015-01.csv', dest_folder='/data/public_data/taxis/splits/raw/' ) ``` ```r dir('/data/public_data/taxis/splits/raw') |> tail() ``` ``` ## [1] "0000000315.csv" "0000000316.csv" "0000000317.csv" "0000000318.csv" ## [5] "0000000319.csv" "header.csv" ``` ??? - We have a bunch of files - And a file holding the column names --- class: middle, center # Map of Desired Area ??? - Need a map of where to keep points --- class: middle ```r nyc <- sf::read_sf( file.path( 'https://services5.arcgis.com/GfwWNkhOj9bNBqoJ/arcgis/rest', 'services/NYC_Community_Districts_Water_Included/FeatureServer/0/', 'query?where=1=1&outSR=4326&f=pgeojson' ) ) |> dplyr::mutate(Boro = BoroCD %/% 100) ``` ??? - Map of NYC - Will use to keep certain records --- class: middle ```r leaflet(elementId='NYCMap') |> addTiles() |> addPolygons(data=nyc, fillColor=~c('red', 'green', 'blue', 'yellow', 'green')[Boro], opacity=0, fillOpacity=.8) ```
??? - Want to limit our search here - Actually, just Manhattan - Notice we only use four colors for the map - In keeping with the four color theorem --- class: middle, center # Spatial Filter ??? - Figure out which points fall in Manhattan - For each file --- class: middle ```r remove_points <- function(in_file, out_file, header_file, geo_bounds) { headers <- data.table::fread(header_file) data.table::fread(in_file, col.names=names(headers)) |> tibble::as_tibble() |> dplyr::select( tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount ) |> # convert lat/long to POINT object sf::st_as_sf( coords=c('pickup_longitude', 'pickup_latitude'), remove=FALSE, crs = sf::st_crs(4326) ) |> # only keep points within a certain area * sf::st_filter( geo_bounds, .predicate=sf::st_intersects ) |> # convert geom column to binary # this way fwrite writes a format that psql can copy to a geometry column * dplyr::mutate(the_geom=sf::st_as_binary(geometry, EWKB=TRUE, hex=TRUE)) |> sf::st_drop_geometry() |> data.table::fwrite(file=out_file, col.names=TRUE, scipen=10, na='') return(out_file) } ``` ??? - Read in a single file - fread() for speed - {dplyr} for ease - Convert to sf - filter it based on another sf object - Convert geom part to EWKB so postgres can ingest from CSV - Hex for text instead of raw - Write to another file, with headers - Mix {data.table}, {dplyr}, {sf} --- class: middle ```r file_name_mapping <- tibble::tibble( infile=raw_files, outfile=stringr::str_replace(infile, 'raw', 'filtered') ) file_name_mapping |> head() ``` ``` ## # A tibble: 6 × 2 ## infile outfile ## <chr> <chr> ## 1 /data/public_data/taxis/splits/raw//0000000001.csv /data/public_data/taxis/sp… ## 2 /data/public_data/taxis/splits/raw//0000000002.csv /data/public_data/taxis/sp… ## 3 /data/public_data/taxis/splits/raw//0000000003.csv /data/public_data/taxis/sp… ## 4 /data/public_data/taxis/splits/raw//0000000004.csv /data/public_data/taxis/sp… ## 5 /data/public_data/taxis/splits/raw//0000000005.csv /data/public_data/taxis/sp… ## 6 /data/public_data/taxis/splits/raw//0000000006.csv /data/public_data/taxis/sp… ``` ??? - Before we run it - This allows us to map an input file to output file nicely --- class: middle ```r file.remove(dir('/data/public_data/taxis/splits/filtered/', full.names=TRUE)) ``` ``` ## logical(0) ``` ```r purrr::walk2( # only doing 30 rows for demo file_name_mapping$infile[1:30], file_name_mapping$outfile[1:30], # call our function for each file ~remove_points( in_file=.x, out_file=.y, header_file='/data/public_data/taxis/splits/raw/header.csv', geo_bounds=nyc |> dplyr::filter(Boro == 1) ) ) ``` ??? - Only use 30 files for demo - Filter each file separately --- class: middle ```r one_filtered <- readr::read_csv('/data/public_data/taxis/splits/filtered/0000000001.csv') one_filtered ``` ``` ## # A tibble: 36,504 × 17 ## tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance ## <dttm> <dttm> <dbl> <dbl> ## 1 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 ## 2 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.3 ## 3 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.8 ## 4 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.5 ## 5 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3 ## 6 2015-01-10 20:33:39 2015-01-10 20:58:31 1 2.2 ## 7 2015-01-10 20:33:39 2015-01-10 20:42:20 3 0.8 ## 8 2015-01-10 20:33:40 2015-01-10 20:40:44 2 0.9 ## 9 2015-01-10 20:33:40 2015-01-10 20:41:39 1 0.9 ## 10 2015-01-10 20:33:41 2015-01-10 20:43:26 1 1.1 ## # … with 36,494 more rows, and 13 more variables: pickup_longitude <dbl>, ## # pickup_latitude <dbl>, dropoff_longitude <dbl>, dropoff_latitude <dbl>, ## # payment_type <dbl>, fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, ## # tip_amount <dbl>, tolls_amount <dbl>, improvement_surcharge <dbl>, ## # total_amount <dbl>, the_geom <chr> ``` ??? - Only has data for Manhattan - No geometry column - Rather a character column holding Well Known Binary data --- class: section # Filter Rows Based on Aggregation ??? - Only keep rides from days where the daily total was above a threshold - Can't do one file at a time --- class: middle, center # Arrow ??? - Great for medium data - Entire dataset - Not all in memory - dplyr - Installing... --- class: middle ```r source("https://raw.githubusercontent.com/apache/arrow/master/r/R/install-arrow.R") install_arrow() ``` ??? - Can install from CRAN - Installing on Linux can be iffy - Especially if airgapped - https://arrow.apache.org/docs/r/articles/install.html --- class: middle ```r filter_by_cash <- function(data, time_col, amount_col, format='csv', threshold, output) { # not read into memory * filtered <- open_dataset(data, format=format) |> # converts timestamp to just a date dplyr::mutate(Date=cast({{time_col}}, date32())) # computes daily summaries daily_amounts <- filtered |> * dplyr::group_by(Date) |> * dplyr::summarize(TotalFare=sum({{amount_col}}, na.rm=TRUE)) |> * dplyr::filter(TotalFare > threshold) # filtering join # keeps rows meeting threshold from above * dplyr::semi_join( filtered, daily_amounts, by=c('Date', 'Date') ) |> * write_dataset(output, format='csv') return(output) } ``` ??? - Read in all the split CSVs: open_dataset() - sum fares by day - semi_join() is a filtering join - keep only records that belong to days where total exceed certain amount - Write back out to a file - Only parts of the data ever make it into memory --- class: middle ```r processed <- filter_by_cash( data='/data/public_data/taxis/splits/filtered', time_col=tpep_pickup_datetime, amount_col=total_amount, threshold=6000000, output='/data/public_data/taxis/splits/processed' ) ``` ```r dir('/data/public_data/taxis/splits/processed') ``` ``` ## [1] "part-0.csv" ``` ??? - Happened to write on file but we could write multiple --- class: section # Load to Database ??? - Now that data is clean - Put into a database --- class: center, middle # Docker ??? - Docker to hold database --- class: middle ```yaml version: '2.1' services: etl-db: # image with GIS capabilities image: kartoza/postgis:11.0-2.5 volumes: # stores data on disk, not in container - /data/public_data/taxis/db:/var/lib/postgresql # this script creates a table we'll write to - ./database/create_main_table.sql:/docker-entrypoint-initdb.d/create_main_table.sql container_name: etl-loader environment: - POSTGRES_DB=loader - POSTGRES_USER=docker - POSTGRES_PASS=docker - RECREATE_DATADIR=TRUE - ALLOW_IP_RANGE=0.0.0.0/0 # Add extensions you need to be enabled by default in the DB. - POSTGRES_MULTIPLE_EXTENSIONS=postgis,hstore,postgis_topology,ogr_fdw ports: # in case you have multiple machines running - 5436:5432 restart: on-failure healthcheck: test: "exit 0" ``` ??? - Uses prebuilt container: kartoza/postgis - Volume: data survives reboots - SQL files - Create really secure password - Includes GIS extensions - Port --- class: middle ```sql DROP TABLE IF EXISTS main_table; CREATE UNLOGGED TABLE IF NOT EXISTS main_table ( tpep_pickup_datetime timestamp with time zone , tpep_dropoff_datetime timestamp with time zone , passenger_count int , trip_distance double precision , pickup_longitude double precision , pickup_latitude double precision , dropoff_longitude double precision , dropoff_latitude double precision , payment_type int , fare_amount double precision , extra double precision , mta_tax double precision , tip_amount double precision , tolls_amount double precision , improvement_surcharge double precision , total_amount double precision , "date" date , the_geom geometry , CONSTRAINT enforce_dims_the_geom CHECK (st_ndims (the_geom) = 2) , CONSTRAINT enforce_geotype_geom CHECK (geometrytype (the_geom) = 'POINT'::text OR the_geom IS NULL) , CONSTRAINT enforce_srid_the_geom CHECK (st_srid (the_geom) = 4326) ); -- make postgres the owner ALTER TABLE main_table OWNER TO postgres; ``` ??? - Creates table - Ensures we have a geometry column - Sets lat/long - No indexes yet - Add after data load - Spin up container --- class: middle ```r docker_up <- processx::run( 'docker-compose', args=c( '-f', here::here('Talks', 'MediumData', 'project', 'docker-compose.yml') , 'up' , '-d' ) ) ``` ??? - Spins up container - Builds tables - -f let's me specify the compose file - -d means detached --- class: center, middle # Copy from CSV to Table ??? - Now we ingest the data --- class: middle ```text BEGIN; -- bulk loading is faster if we drop the indexes and rebuild after DROP INDEX IF EXISTS main_table_the_geom_gist; DROP INDEX IF EXISTS main_table_tpep_pickup_datetime; DELETE FROM main_table; \copy main_table ( tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude, pickup_latitude,dropoff_longitude,dropoff_latitude,payment_type,fare_amount, extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge, total_amount,the_geom,date) from /data/public_data/taxis/splits/processed/part-0.csv CSV HEADER NULL ''; CREATE INDEX main_table_the_geom_gist ON main_table USING gist ( the_geom); CREATE INDEX main_table_tpep_pickup_datetime ON main_table ( tpep_pickup_datetime); COMMIT; ``` ??? - Drop indexes for speed - Delete current data - Copy in new data - Really fast - this has to be on a single line of code - Copying from CSV with headers and blanks as NULL - Build indexes - All part of a transaction - Unrolls if any part fails --- class: middle ```text [MediumData] host=localhost port=5436 user=docker password=docker dbname=loader ``` ??? - We can't include a password in our next step - In ~/.pg_service.conf - Or in $PGPASSWORD - Other connection info for database --- class: middle ```r write_result <- processx::run( "psql", c( * "service=MediumData", "-f", here::here('Talks', 'MediumData', 'project', 'copy_to_db.txt') ) ) ``` ??? - Get instructions from file we wrote earlier - service - All database connection info stored in service config file - Including password - -f provides a file of commands --- class: section # Orchestration ??? - Run all those pieces smartly - In order - Don't repeat steps - Worried about memory --- class: middle, center # [`{targets}`](https://cran.r-project.org/web/packages/targets/index.html) ??? - Put it all together - Only run steps that need to be reran - No wasting computation - Builds the DAG - Moves objects in and out of memory - Very reproducible - Changed my workflow entirely --- class: middle ```r fs::dir_tree(here::here('Talks', 'MediumData', 'project')) ``` ``` ## /home/jared/consulting/talks/Talks/MediumData/project ## ├── R ## │ └── functions.r ## ├── _targets.R ## ├── _targets.yaml ## ├── copy_to_db.txt ## ├── database ## │ └── create_main_table.sql ## └── docker-compose.yml ``` ??? - Functions go in functions.r - main file is _targets.R: specifies steps - Config in _targets.yml - database instructions in copy_to_db.txt: for copying to database - database files in database/: for building database - docker-compose.yml: Building docker container - Focus on _targets.R --- class: middle ```r library(targets) library(tarchetypes) source('R/functions.r') list( ... ) ``` ??? - Load {targets} package - {tarchetypes} has helper functions - Functions go in a file - All steps go inside the list() - Back at beggining --- class: middle, center # Getting the Data ??? - Get the data from S3 only if it's new --- class: middle ```r list( * tar_force( latest_file_info, aws.s3::get_bucket_df(bucket='nyc-tlc', prefix='trip') |> # only focusing on one file dplyr::filter(stringr::str_detect(Key, 'yellow_tripdata_2015-01.csv')), force=TRUE ) * , tar_file( raw_data, # downloads file # returns filename aws.s3::save_object( object=latest_file_info$Key[1], bucket='nyc-tlc', file='/data/public_data/taxis/yellow_tripdata_2015-01.csv' ) ) , ... ) ``` ??? - tar_force() runs every time - Always checks the file info - Sees if the data in the buckets changed - Downloads data if that changes - Should be a better way of checking, maybe there is - tar_file() reruns if the bucket changed - Returns name of local file --- class: middle ```r tar_visnetwork(names=raw_data, targets_only=TRUE) ```
??? - Three steps - tar_force() runs every time - All downstream targets now look outdated - Even if up to date --- class: middle, center # Splitting the Data ??? - Break it up into a bunch of files - Wrote function --- class: middle ```r list( ... * , tar_target( raw_files, split_file( raw_data, dest_folder='/data/public_data/taxis/splits/raw' ) ) , tar_target( file_name_mapping, tibble::tibble( infile=raw_files, outfile=stringr::str_replace(infile, 'raw', 'filtered') ) |> dplyr::rowwise() |> targets::tar_group(), * iteration='group' ) , ... ) ``` ??? - Split up the files - raw_files is a tibble of file names - Keep track of the names since we'll be working with each file separately - Group used later to process files --- class: middle ```r tar_visnetwork(names=file_name_mapping, targets_only=TRUE) ```
??? - DAG is more built out - Only split the file if it has been downloaded and changed --- class: middle, center # Map of Desired Area ??? - Need the map data - But only if it changes --- class: middle ```r list( ... * , tar_url( nyc_geo_file, file.path( 'https://services5.arcgis.com/GfwWNkhOj9bNBqoJ/arcgis/rest', 'services/NYC_Community_Districts_Water_Included/FeatureServer/0/', 'query?where=1=1&outSR=4326&f=pgeojson' ) ) , tar_target( nyc, sf::read_sf(nyc_geo_file) |> dplyr::mutate(Boro = BoroCD %/% 100) ) , ... ) ``` ??? - tar_url() checks if the file on the internet has changed --- class: middle ```r tar_visnetwork(names=nyc, targets_only=TRUE) ```
??? - DAG for just the map --- class: middle ```r tar_visnetwork(names=c(nyc, file_name_mapping), targets_only=TRUE) ```
??? - Notice they are disconnected - Connect next --- class: middle, center # Spatial Filter ??? - Remove rows based on geography --- class: middle ```r list( ... # get existing file names in the folder so we can delete them , tar_force( filtered_files_list, dir('/data/public_data/taxis/splits/filtered', full.names=TRUE), force=TRUE ) , tar_target( remove_filtered_files, if(length(filtered_files_list) && !is.na(filtered_files_list)) file.remove(filtered_files_list) ) * , tar_file( filtered_files, { # makes sure this happens after deletion # it only deletes once, regardless of mapping remove_filtered_files remove_points( * in_file=file_name_mapping$infile, * out_file=file_name_mapping$outfile, header_file=file.path('/data', 'public_data', 'taxis', 'splits', 'raw', 'header.csv'), geo_bounds=nyc ) }, * pattern=map(file_name_mapping) ) , ... ) ``` ??? - Make list of filtered files - Delete existing files - Process each of the split files - map(file_name_mapping) does each file separately - Can be done in parallel - Curly braces makes the step artificially depend on other steps --- class: middle ```r tar_visnetwork(names=filtered_files, targets_only=TRUE) ```
??? - Removing old files - input/output list - nyc map - All feed into filtered files step - filtered_files is square - Because it iterates over a pattern --- class: middle, center # Filter Rows Based on Aggregation ??? - Filtering rows based on group aggregates --- class: middle ```r list( ... * , tar_change( processed_file, filter_by_cash( data='/data/public_data/taxis/splits/filtered', time_col=tpep_pickup_datetime, amount_col=total_amount, threshold=600000, output='/data/public_data/taxis/splits/processed' ), * change=filtered_files, * format='file', * packages=c('arrow'), deployment='main' ) , ... ) ``` ??? - Runs if the filtered files change - Not referenced in expression - Telling targets to load {arrow} for this step - format='file' will track changes in this file for downstream targets - {arrow} doesn't play nicely with {targets} so you need to create and destroy the {arrow} object within a single target - Even harder with schemas - We'll get to deployment='main' --- class: middle ```r tar_visnetwork(names=processed_file, targets_only=TRUE) ```
??? - processed_file runs if filtered_files changed --- class: middle, center # Docker ??? - Launch Docker from within {targets} --- class: middle ```r list( ... , tar_target( docker_up, list( * up=processx::run( 'docker-compose', args=c( '-f', here::here('Talks', 'MediumData', 'project', 'docker-compose.yml') , 'up' , '-d' ) ), # we will look to see if this changes * time_created=processx::run( * "docker", * args=c("container", "inspect", "-f", "'{{ .Created }}'", "etl-loader") ) ) ) , ... ) ``` ??? - Spins up docker container - list of two commands - any valid R object - Use processx::run() for a lot of command line work in this project - Keeps track of when container was started - Will be used later --- class: middle ```r tar_visnetwork(names=docker_up, targets_only=TRUE) ```
??? - Not connected to anything yet --- class: middle ```r tar_visnetwork(names=c(docker_up, processed_file), targets_only=TRUE) ```
??? - docker_up is all by itself --- class: middle, center # Copy from CSV to Table ??? - Get data into table --- class: middle ```r list( ... , tar_change( load_data, processx::run( "psql", c( * "service=MediumData", "-f", here::here('Talks', 'MediumData', 'project', 'copy_to_db.txt') ) ), # depends on both docker and the processed file * change=list(docker_up$time_created, processed_file) ) ) ``` ??? - Command has no dependencies - This runs if either the docker container is new or the processed file changed - Database info saved in config file --- class: middle ```r tar_visnetwork(names=load_data, targets_only=TRUE) ```
??? - docker_up is connected - Complete DAG - All parts connected --- class: middle ```r tar_visnetwork(names=load_data, targets_only=FALSE) ```
??? - Show custom functions too - Triangles --- class: middle, center # Run Everything ??? - Run the whole pipeline - At least whatever needs updating --- class: middle ```r tar_make() ``` ??? - tar_make() runs any steps that need to be run --- class: middle, center # Now In Parallel ??? - Take advantage of the split files --- class: middle ```r dir('/data/public_data/taxis/splits/raw/') |> head(n=10) ``` ``` ## [1] "0000000001.csv" "0000000002.csv" "0000000003.csv" "0000000004.csv" ## [5] "0000000005.csv" "0000000006.csv" "0000000007.csv" "0000000008.csv" ## [9] "0000000009.csv" "0000000010.csv" ``` ??? - We have all those files - Need geofiltering individually --- class: middle ```r list( ... , tar_file( filtered_files, { # makes sure this happens after deletion # it only deletes once, regardless of mapping remove_filtered_files remove_points( in_file=file_name_mapping$infile, out_file=file_name_mapping$outfile, header_file=file.path('/data', 'public_data', 'taxis', 'splits', 'raw', 'header.csv'), geo_bounds=nyc ) }, * pattern=map(file_name_mapping) ) , ... ) ``` ??? - map() will run remove_points() on each file separately - In parallel --- class: middle ```r list( ... , tar_change( processed_file, filter_by_cash( data='/data/public_data/taxis/splits/filtered', time_col=tpep_pickup_datetime, amount_col=total_amount, threshold=600000, output='/data/public_data/taxis/splits/processed' ), change=filtered_files, format='file', packages=c('arrow'), * deployment='main' ) , ... ) ``` ??? - {arrow} runs in parallel implicitly - Race condition with other parallel steps (internal or between) - Want to keep both - deployment='main' runs this step alone - So it can run parallel itself --- class: middle ```r library(targets) library(tarchetypes) source('functions.r') *future::plan(future::multisession, workers=4) list( ... ) ``` ??? - Define parallel plan in _targets.R file - Everything will be automatic - Including targets that use parallelism inside them - like future_map() - Though I haven't had success - Instead of tar_make()... --- class: middle ```r tar_make_future() ``` ??? - Changing tar_make() to tar_make_future() makes everything run in parallel - Multiple machines --- background-image: url("data:image/png;base64,#/home/jared/consulting/talks/images/road_with_lights.jpg") background-size: cover ??? - Just go along from there --- class: section # What Tools Did we Employ? ??? - What did we do? --- class: middle, center ### [`{aws.s3}`](https://cran.r-project.org/web/packages/aws.s3/index.html) for Cloud Storage ??? - Data hosted on S3 - Could push results back to S3 - Could use competing S3 --- class: middle, center ### Command Line for Splitting and Preprocessing Files ??? - We split our files into hundreds of files for parallel processing - Can also clean bad data - Database and docker commands --- class: middle, center ### [`{data.table}`](https://cran.r-project.org/web/packages/data.table/index.html) for Reading and Writing Data ??? - fread() and fwrite() are fast - And safe - Had consistency issues with other packages --- class: middle, center ### [`{dplyr}`](https://cran.r-project.org/web/packages/dplyr/index.html) and [`{purrr}`](https://cran.r-project.org/web/packages/purrr/index.html) for Data Manipulation ??? - Go to data manipulation --- class: center, middle ### [`{sf}`](https://cran.r-project.org/web/packages/sf/index.html) for Geospatial Filtering ??? - Spatial filters - And so much more --- class: middle, center ### [`{arrow}`](https://cran.r-project.org/web/packages/arrow/index.html) for Operating on Out of Memory Data ??? - When data are too big for memory - Can use regular {dplyr} syntax --- class: middle, center ### Postgres and PostGIS for Holding the Finished Data ??? - Others can access with ordinary SQL tools - Can do spatial queries in SQL --- class: middle, center ### Docker for Isolating the Database ??? - Have a different container for each database project - Easy to spin up and tear down - No fear of breaking things --- class: middle, center ### [`{targets}`](https://cran.r-project.org/web/packages/targets/index.html) for Orchestrating ??? - Puts everything together - Builds the DAG - Only run steps that need to be renewed - Easier than Airflow - All R-based --- class: center, middle
??? - Summation of whole project --- class: part # Thank You ??? - Thank you