This lecture note is based on Dr. Hua Zhou’s 2018 Winter Statistical Computing course notes available at http://hua-zhou.github.io/teaching/biostatm280-2018winter/index.html.
sessionInfo()
## R version 3.3.3 (2017-03-06)
## Platform: x86_64-pc-linux-gnu (64-bit)
## Running under: Debian GNU/Linux 9 (stretch)
##
## locale:
## [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
## [3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
## [5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
## [7] LC_PAPER=en_US.UTF-8 LC_NAME=C
## [9] LC_ADDRESS=C LC_TELEPHONE=C
## [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
##
## attached base packages:
## [1] stats graphics grDevices utils datasets methods base
##
## loaded via a namespace (and not attached):
## [1] backports_1.1.2 magrittr_1.5 rprojroot_1.3-2 tools_3.3.3
## [5] htmltools_0.3.6 yaml_2.2.0 Rcpp_1.0.0 stringi_1.2.4
## [9] rmarkdown_1.10 knitr_1.20 stringr_1.3.1 digest_0.6.18
## [13] evaluate_0.12
In this tutorial, we do some exploratory data analysis of the distributed data in a Hadoop YARN cluster. Again it is adapted from the tutorial at RStudio.
http://35.187.203.86:8787
.
tidyverse
, DBI
, sparklyr
should be already installed globally.# Connect to Spark
library(sparklyr)
library(dplyr)
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library(ggplot2)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
config <- spark_config()
sc <- spark_connect(master = "yarn-client", config = config)
#sc <- spark_connect(master = "yarn", config = config)
sc
## $master
## [1] "yarn-client"
##
## $method
## [1] "shell"
##
## $app_name
## [1] "sparklyr"
##
## $config
## $config$spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
##
## $config$sparklyr.connect.csv.embedded
## [1] "^1.*"
##
## $config$spark.sql.catalogImplementation
## [1] "hive"
##
## $config$sparklyr.connect.cores.local
## [1] 4
##
## $config$spark.sql.shuffle.partitions.local
## [1] 4
##
## attr(,"config")
## [1] "default"
## attr(,"file")
## [1] "/usr/local/lib/R/site-library/sparklyr/conf/config-template.yml"
##
## $state
## <environment: 0x559b72383cc8>
##
## $spark_home
## [1] "/usr/lib/spark"
##
## $backend
## description class mode
## "->localhost:8881" "sockconn" "wb"
## text opened can read
## "binary" "opened" "yes"
## can write
## "yes"
##
## $monitoring
## description class mode
## "->localhost:8881" "sockconn" "wb"
## text opened can read
## "binary" "opened" "yes"
## can write
## "yes"
##
## $gateway
## description class mode
## "->localhost:8880" "sockconn" "rb"
## text opened can read
## "binary" "opened" "yes"
## can write
## "yes"
##
## $output_file
## [1] "/tmp/Rtmp99VppQ/file14ef7261820b_spark.log"
##
## $sessionId
## [1] 40584
##
## attr(,"class")
## [1] "spark_connection" "spark_shell_connection"
## [3] "DBIConnection"
Create dplyr reference to the Spark DataFrame.
# Cache flights Hive table into Spark
#tbl_cache(sc, 'flights')
flights_tbl <- tbl(sc, 'flights')
flights_tbl %>% print(width = Inf)
## # Source: spark<flights> [?? x 29]
## year month dayofmonth dayofweek deptime crsdeptime arrtime crsarrtime
## * <int> <int> <int> <int> <int> <int> <int> <int>
## 1 NA NA NA NA NA NA NA NA
## 2 1987 10 14 3 741 730 912 849
## 3 1987 10 15 4 729 730 903 849
## 4 1987 10 17 6 741 730 918 849
## 5 1987 10 18 7 729 730 847 849
## 6 1987 10 19 1 749 730 922 849
## 7 1987 10 21 3 728 730 848 849
## 8 1987 10 22 4 728 730 852 849
## 9 1987 10 23 5 731 730 902 849
## 10 1987 10 24 6 744 730 908 849
## # ... with more rows, and 21 more variables: uniquecarrier <chr>,
## # flightnum <int>, tailnum <chr>, actualelapsedtime <int>,
## # crselapsedtime <int>, airtime <chr>, arrdelay <int>, depdelay <int>,
## # origin <chr>, dest <chr>, distance <int>, taxiin <chr>, taxiout <chr>,
## # cancelled <int>, cancellationcode <chr>, diverted <int>,
## # carrierdelay <chr>, weatherdelay <chr>, nasdelay <chr>,
## # securitydelay <chr>, lateaircraftdelay <chr>
# Cache airlines Hive table into Spark
#tbl_cache(sc, 'airlines')
airlines_tbl <- tbl(sc, 'airlines')
airlines_tbl %>% print(width = Inf)
## # Source: spark<airlines> [?? x 2]
## code description
## * <chr> <chr>
## 1 Code Description
## 2 02Q Titan Airways
## 3 04Q Tradewind Aviation
## 4 05Q Comlux Aviation, AG
## 5 06Q Master Top Linhas Aereas Ltd.
## 6 07Q Flair Airlines Ltd.
## 7 09Q Swift Air, LLC d/b/a Eastern Air Lines d/b/a Eastern
## 8 0BQ DCA
## 9 0CQ ACM AIR CHARTER GmbH
## 10 0FQ Maine Aviation Aircraft Charter, LLC
## # ... with more rows
# Cache airports Hive table into Spark
#tbl_cache(sc, 'airports')
airports_tbl <- tbl(sc, 'airports')
airports_tbl %>% print(width = Inf)
## # Source: spark<airports> [?? x 12]
## id name city country faa icao lat lon alt tz_offset dst
## * <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
## 1 1 Goro… Goro… Papua … GKA AYGA -6.0… 145.… 5282 10 U
## 2 2 Mada… Mada… Papua … MAG AYMD -5.2… 145.… 20 10 U
## 3 3 Moun… Moun… Papua … HGU AYMH -5.8… 144.… 5388 10 U
## 4 4 Nadz… Nadz… Papua … LAE AYNZ -6.5… 146.… 239 10 U
## 5 5 Port… Port… Papua … POM AYPY -9.4… 147.… 146 10 U
## 6 6 Wewa… Wewak Papua … WWK AYWK -3.5… 143.… 19 10 U
## 7 7 Nars… Nars… Greenl… UAK BGBW 61.1… -45.… 112 -3 E
## 8 8 Godt… Godt… Greenl… GOH BGGH 64.1… -51.… 283 -3 E
## 9 9 Kang… Sond… Greenl… SFJ BGSF 67.0… -50.… 165 -3 E
## 10 10 Thul… Thule Greenl… THU BGTL 76.5… -68.… 251 -4 E
## # ... with more rows, and 1 more variable: tz_name <chr>
Across whole data set using dplyr
:
system.time({
out <- flights_tbl %>%
group_by(year) %>%
count() %>%
arrange(year) %>%
collect()
})
## user system elapsed
## 0.096 0.000 64.988
out
## # A tibble: 23 x 2
## # Groups: year [23]
## year n
## <int> <dbl>
## 1 NA 22
## 2 1987 1311826
## 3 1988 5202096
## 4 1989 5041200
## 5 1990 5270893
## 6 1991 5076925
## 7 1992 5092157
## 8 1993 5070501
## 9 1994 5180048
## 10 1995 5327435
## # ... with 13 more rows
out %>% ggplot(aes(x = year, y = n)) + geom_col()
## Warning: Removed 1 rows containing missing values (position_stack).
How many flights from JFK (New York) per year:
system.time({
out <- flights_tbl %>%
filter(origin == "JFK") %>%
group_by(year) %>%
count() %>%
arrange(year) %>%
collect()
})
## user system elapsed
## 0.092 0.000 41.842
out
## # A tibble: 22 x 2
## # Groups: year [22]
## year n
## <int> <dbl>
## 1 1987 12273
## 2 1988 51485
## 3 1989 48606
## 4 1990 47153
## 5 1991 42235
## 6 1992 42454
## 7 1993 41842
## 8 1994 43116
## 9 1995 46187
## 10 1996 44064
## # ... with 12 more rows
out %>% ggplot(aes(x = year, y = n)) +
geom_col() +
labs(title = "Number of flights from JFK")
How many flights to JFK per year:
system.time({
out <- flights_tbl %>%
filter(dest == "JFK") %>%
group_by(year) %>%
count() %>%
arrange(year) %>%
collect()
})
## user system elapsed
## 0.068 0.000 31.899
out
## # A tibble: 22 x 2
## # Groups: year [22]
## year n
## <int> <dbl>
## 1 1987 12317
## 2 1988 52086
## 3 1989 49714
## 4 1990 47689
## 5 1991 42449
## 6 1992 42649
## 7 1993 41895
## 8 1994 43276
## 9 1995 46194
## 10 1996 44064
## # ... with 12 more rows
out %>% ggplot(aes(x = year, y = n)) +
geom_col() +
labs(title = "Number of flights to JFK")
Suppose we want to fit a linear regression of gain
(depdelay
- arrdelay
) on distance, departure delay, and carriers using data from 2003-2007.
# Filter records and create target variable 'gain'
system.time(
model_data <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year >= 2003 & year <= 2007) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain)
)
## user system elapsed
## 0.008 0.000 0.005
model_data
## # Source: spark<?> [?? x 8]
## year month arrdelay depdelay distance uniquecarrier description gain
## * <int> <int> <int> <int> <int> <chr> <chr> <int>
## 1 2003 1 23 29 837 UA United Air … 6
## 2 2003 1 52 18 1835 UA United Air … -34
## 3 2003 1 64 82 413 UA United Air … 18
## 4 2003 1 54 38 413 UA United Air … -16
## 5 2003 1 5 17 413 UA United Air … 12
## 6 2003 1 132 126 413 UA United Air … -6
## 7 2003 1 7 29 1723 UA United Air … 22
## 8 2003 1 62 77 1723 UA United Air … 15
## 9 2003 1 148 135 1723 UA United Air … -13
## 10 2003 1 101 97 1723 UA United Air … -4
## # ... with more rows
# Summarize data by carrier
model_data %>%
group_by(uniquecarrier) %>%
summarize(description = min(description), gain = mean(gain),
distance = mean(distance), depdelay = mean(depdelay)) %>%
select(description, gain, distance, depdelay) %>%
arrange(gain)
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## # Source: spark<?> [?? x 4]
## # Ordered by: gain
## description gain distance depdelay
## * <chr> <dbl> <dbl> <dbl>
## 1 ATA Airlines d/b/a ATA -3.35 1135. 56.1
## 2 ExpressJet Airlines Inc. (1) -3.03 520. 59.4
## 3 Envoy Air -2.54 416. 53.1
## 4 Northwest Airlines Inc. -2.20 779. 48.5
## 5 Delta Air Lines Inc. -1.82 868. 50.8
## 6 AirTran Airways Corporation -1.43 642. 55.0
## 7 Continental Air Lines Inc. -0.962 1117. 57.0
## 8 American Airlines Inc. -0.886 1074. 55.5
## 9 Endeavor Air Inc. -0.639 467. 58.5
## 10 JetBlue Airways -0.326 1139. 54.1
## # ... with more rows
sparklyr
provides interface for the Spark MLlib library for machine learning. RStudio cheat sheet provides a good start point.
Predict time gained or lost in flight as a function of distance, departure delay, and airline carrier.
# Partition the data into training and validation sets
model_partition <- model_data %>%
sdf_partition(train = 0.8, valid = 0.2, seed = 5555)
# Fit a linear model
system.time(
ml1 <- model_partition$train %>%
ml_linear_regression(gain ~ distance + depdelay + uniquecarrier)
)
## user system elapsed
## 1.056 0.016 321.443
# Summarize the linear model
summary(ml1)
## Deviance Residuals (approximate):
## Min 1Q Median 3Q Max
## -270.308 -5.579 2.640 9.721 220.210
##
## Coefficients:
## (Intercept) distance depdelay uniquecarrier_WN
## 1.744310610 0.003265998 -0.014662325 0.876124221
## uniquecarrier_AA uniquecarrier_MQ uniquecarrier_UA uniquecarrier_DL
## -5.314241548 -4.870711724 -3.670680336 -5.676217564
## uniquecarrier_US uniquecarrier_OO uniquecarrier_EV uniquecarrier_NW
## -3.603629208 -2.367797953 -1.052889012 -5.783125644
## uniquecarrier_XE uniquecarrier_CO uniquecarrier_OH uniquecarrier_FL
## -5.584317607 -5.513917180 -2.152535205 -4.454803437
## uniquecarrier_AS uniquecarrier_DH uniquecarrier_YV uniquecarrier_B6
## -2.067187427 -0.754864480 0.123395029 -4.945583357
## uniquecarrier_HP uniquecarrier_9E uniquecarrier_F9 uniquecarrier_TZ
## -0.894187818 -2.987736373 -3.885617746 -7.986040267
## uniquecarrier_HA
## -3.132802810
##
## R-Squared: 0.02385
## Root Mean Squared Error: 17.74
Compare the model performance using the validation data.
# Calculate average gains by predicted decile
system.time(
model_deciles <- lapply(model_partition, function(x) {
#sdf_predict(ml1, x) %>%
ml_predict(ml1, x) %>%
mutate(decile = ntile(desc(prediction), 10)) %>%
group_by(decile) %>%
summarize(gain = mean(gain)) %>%
select(decile, gain) %>%
collect()
})
)
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## user system elapsed
## 0.284 0.004 110.052
model_deciles
## $train
## # A tibble: 10 x 2
## decile gain
## <int> <dbl>
## 1 1 6.02
## 2 2 2.48
## 3 3 1.71
## 4 4 1.07
## 5 5 0.337
## 6 6 -0.343
## 7 7 -1.23
## 8 8 -2.25
## 9 9 -2.70
## 10 10 -3.78
##
## $valid
## # A tibble: 10 x 2
## decile gain
## <int> <dbl>
## 1 1 5.97
## 2 2 2.51
## 3 3 1.64
## 4 4 1.06
## 5 5 0.385
## 6 6 -0.498
## 7 7 -1.15
## 8 8 -2.27
## 9 9 -2.71
## 10 10 -3.83
# Create a summary dataset for plotting
deciles <- rbind(
data.frame(data = 'train', model_deciles$train),
data.frame(data = 'valid', model_deciles$valid),
make.row.names = FALSE
)
deciles
## data decile gain
## 1 train 1 6.0197657
## 2 train 2 2.4794602
## 3 train 3 1.7137775
## 4 train 4 1.0696767
## 5 train 5 0.3367499
## 6 train 6 -0.3434529
## 7 train 7 -1.2306961
## 8 train 8 -2.2462305
## 9 train 9 -2.7020457
## 10 train 10 -3.7753330
## 11 valid 1 5.9682166
## 12 valid 2 2.5054524
## 13 valid 3 1.6396994
## 14 valid 4 1.0643781
## 15 valid 5 0.3852696
## 16 valid 6 -0.4975022
## 17 valid 7 -1.1484578
## 18 valid 8 -2.2691283
## 19 valid 9 -2.7098733
## 20 valid 10 -3.8327128
# Plot average gains by predicted decile
deciles %>%
ggplot(aes(factor(decile), gain, fill = data)) +
geom_bar(stat = 'identity', position = 'dodge') +
labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')
Compare actual gains to predicted gains for an out of time sample.
# Select data from an out of time sample
data_2008 <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year == 2008) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain, origin, dest)
data_2008
## # Source: spark<?> [?? x 10]
## year month arrdelay depdelay distance uniquecarrier description gain
## * <int> <int> <int> <int> <int> <chr> <chr> <int>
## 1 2008 1 2 19 810 WN Southwest … 17
## 2 2008 1 34 34 515 WN Southwest … 0
## 3 2008 1 11 25 688 WN Southwest … 14
## 4 2008 1 57 67 1591 WN Southwest … 10
## 5 2008 1 80 94 828 WN Southwest … 14
## 6 2008 1 15 27 1489 WN Southwest … 12
## 7 2008 1 16 28 838 WN Southwest … 12
## 8 2008 1 37 51 220 WN Southwest … 14
## 9 2008 1 19 32 220 WN Southwest … 13
## 10 2008 1 6 20 220 WN Southwest … 14
## # ... with more rows, and 2 more variables: origin <chr>, dest <chr>
# Summarize data by carrier
#carrier <- sdf_predict(ml1, data_2008) %>%
carrier <- ml_predict(ml1, data_2008) %>%
group_by(description) %>%
summarize(gain = mean(gain), prediction = mean(prediction), freq = n()) %>%
filter(freq > 10000) %>%
collect()
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
carrier
## # A tibble: 18 x 4
## description gain prediction freq
## <chr> <dbl> <dbl> <dbl>
## 1 Endeavor Air Inc. 0.458 -0.596 35039
## 2 SkyWest Airlines Inc. -1.23 -0.0663 85859
## 3 PSA Airlines Inc. -2.28 0.410 36958
## 4 United Air Lines Inc. 2.02 0.588 97408
## 5 Frontier Airlines Inc. -0.398 0.202 14674
## 6 ExpressJet Airlines Inc. (1) -0.330 -2.81 70037
## 7 Southwest Airlines Co. 4.18 4.01 224655
## 8 ExpressJet Airlines Inc. 0.539 1.32 56247
## 9 Continental Air Lines Inc. 2.36 -0.671 61386
## 10 Northwest Airlines Inc. -2.85 -2.11 49220
## 11 JetBlue Airways -0.975 -0.577 38257
## 12 AirTran Airways Corporation -1.92 -1.14 44867
## 13 US Airways Inc. 2.40 0.468 59418
## 14 Envoy Air -2.54 -2.50 97086
## 15 Alaska Airlines Inc. 1.48 1.95 23806
## 16 Delta Air Lines Inc. -1.42 -1.58 69066
## 17 Mesa Airlines Inc. 0.246 2.25 48208
## 18 American Airlines Inc. -0.688 -0.893 131798
# Plot actual gains and predicted gains by airline carrier
ggplot(carrier, aes(gain, prediction)) +
geom_point(alpha = 0.75, color = 'red', shape = 3) +
geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
geom_text(aes(label = substr(description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')
Some carriers make up more time than others in flight, but the differences are relatively small. The average time gains between the best and worst airlines is only six minutes. The best predictor of time gained is not carrier but flight distance. The biggest gains were associated with the longest flights.
spark_disconnect_all()
## [1] 1