Acknowledgment

This lecture note is based on Dr. Hua Zhou’s 2018 Winter Statistical Computing course notes available at http://hua-zhou.github.io/teaching/biostatm280-2018winter/index.html.

sessionInfo()
## R version 3.3.3 (2017-03-06)
## Platform: x86_64-pc-linux-gnu (64-bit)
## Running under: Debian GNU/Linux 9 (stretch)
## 
## locale:
##  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
##  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
##  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
##  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
##  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
## [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
## 
## attached base packages:
## [1] stats     graphics  grDevices utils     datasets  methods   base     
## 
## loaded via a namespace (and not attached):
##  [1] backports_1.1.2 magrittr_1.5    rprojroot_1.3-2 tools_3.3.3    
##  [5] htmltools_0.3.6 yaml_2.2.0      Rcpp_1.0.0      stringi_1.2.4  
##  [9] rmarkdown_1.10  knitr_1.20      stringr_1.3.1   digest_0.6.18  
## [13] evaluate_0.12

In this tutorial, we do some exploratory data analysis of the distributed data in a Hadoop YARN cluster. Again it is adapted from the tutorial at RStudio.

Log in the Hadoop YARN cluster

Connect to Spark

# Connect to Spark
library(sparklyr)
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(ggplot2)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
config <- spark_config()
sc <- spark_connect(master = "yarn-client", config = config)
#sc <- spark_connect(master = "yarn", config = config)
sc
## $master
## [1] "yarn-client"
## 
## $method
## [1] "shell"
## 
## $app_name
## [1] "sparklyr"
## 
## $config
## $config$spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
## 
## $config$sparklyr.connect.csv.embedded
## [1] "^1.*"
## 
## $config$spark.sql.catalogImplementation
## [1] "hive"
## 
## $config$sparklyr.connect.cores.local
## [1] 4
## 
## $config$spark.sql.shuffle.partitions.local
## [1] 4
## 
## attr(,"config")
## [1] "default"
## attr(,"file")
## [1] "/usr/local/lib/R/site-library/sparklyr/conf/config-template.yml"
## 
## $state
## <environment: 0x559b72383cc8>
## 
## $spark_home
## [1] "/usr/lib/spark"
## 
## $backend
##        description              class               mode 
## "->localhost:8881"         "sockconn"               "wb" 
##               text             opened           can read 
##           "binary"           "opened"              "yes" 
##          can write 
##              "yes" 
## 
## $monitoring
##        description              class               mode 
## "->localhost:8881"         "sockconn"               "wb" 
##               text             opened           can read 
##           "binary"           "opened"              "yes" 
##          can write 
##              "yes" 
## 
## $gateway
##        description              class               mode 
## "->localhost:8880"         "sockconn"               "rb" 
##               text             opened           can read 
##           "binary"           "opened"              "yes" 
##          can write 
##              "yes" 
## 
## $output_file
## [1] "/tmp/Rtmp99VppQ/file14ef7261820b_spark.log"
## 
## $sessionId
## [1] 40584
## 
## attr(,"class")
## [1] "spark_connection"       "spark_shell_connection"
## [3] "DBIConnection"

Access Hive tables

Create dplyr reference to the Spark DataFrame.

# Cache flights Hive table into Spark
#tbl_cache(sc, 'flights')
flights_tbl <- tbl(sc, 'flights')
flights_tbl %>% print(width = Inf)
## # Source: spark<flights> [?? x 29]
##     year month dayofmonth dayofweek deptime crsdeptime arrtime crsarrtime
##  * <int> <int>      <int>     <int>   <int>      <int>   <int>      <int>
##  1    NA    NA         NA        NA      NA         NA      NA         NA
##  2  1987    10         14         3     741        730     912        849
##  3  1987    10         15         4     729        730     903        849
##  4  1987    10         17         6     741        730     918        849
##  5  1987    10         18         7     729        730     847        849
##  6  1987    10         19         1     749        730     922        849
##  7  1987    10         21         3     728        730     848        849
##  8  1987    10         22         4     728        730     852        849
##  9  1987    10         23         5     731        730     902        849
## 10  1987    10         24         6     744        730     908        849
## # ... with more rows, and 21 more variables: uniquecarrier <chr>,
## #   flightnum <int>, tailnum <chr>, actualelapsedtime <int>,
## #   crselapsedtime <int>, airtime <chr>, arrdelay <int>, depdelay <int>,
## #   origin <chr>, dest <chr>, distance <int>, taxiin <chr>, taxiout <chr>,
## #   cancelled <int>, cancellationcode <chr>, diverted <int>,
## #   carrierdelay <chr>, weatherdelay <chr>, nasdelay <chr>,
## #   securitydelay <chr>, lateaircraftdelay <chr>
# Cache airlines Hive table into Spark
#tbl_cache(sc, 'airlines')
airlines_tbl <- tbl(sc, 'airlines')
airlines_tbl %>% print(width = Inf)
## # Source: spark<airlines> [?? x 2]
##    code  description                                         
##  * <chr> <chr>                                               
##  1 Code  Description                                         
##  2 02Q   Titan Airways                                       
##  3 04Q   Tradewind Aviation                                  
##  4 05Q   Comlux Aviation, AG                                 
##  5 06Q   Master Top Linhas Aereas Ltd.                       
##  6 07Q   Flair Airlines Ltd.                                 
##  7 09Q   Swift Air, LLC d/b/a Eastern Air Lines d/b/a Eastern
##  8 0BQ   DCA                                                 
##  9 0CQ   ACM AIR CHARTER GmbH                                
## 10 0FQ   Maine Aviation Aircraft Charter, LLC                
## # ... with more rows
# Cache airports Hive table into Spark
#tbl_cache(sc, 'airports')
airports_tbl <- tbl(sc, 'airports')
airports_tbl %>% print(width = Inf)
## # Source: spark<airports> [?? x 12]
##    id    name  city  country faa   icao  lat   lon   alt   tz_offset dst  
##  * <chr> <chr> <chr> <chr>   <chr> <chr> <chr> <chr> <chr> <chr>     <chr>
##  1 1     Goro… Goro… Papua … GKA   AYGA  -6.0… 145.… 5282  10        U    
##  2 2     Mada… Mada… Papua … MAG   AYMD  -5.2… 145.… 20    10        U    
##  3 3     Moun… Moun… Papua … HGU   AYMH  -5.8… 144.… 5388  10        U    
##  4 4     Nadz… Nadz… Papua … LAE   AYNZ  -6.5… 146.… 239   10        U    
##  5 5     Port… Port… Papua … POM   AYPY  -9.4… 147.… 146   10        U    
##  6 6     Wewa… Wewak Papua … WWK   AYWK  -3.5… 143.… 19    10        U    
##  7 7     Nars… Nars… Greenl… UAK   BGBW  61.1… -45.… 112   -3        E    
##  8 8     Godt… Godt… Greenl… GOH   BGGH  64.1… -51.… 283   -3        E    
##  9 9     Kang… Sond… Greenl… SFJ   BGSF  67.0… -50.… 165   -3        E    
## 10 10    Thul… Thule Greenl… THU   BGTL  76.5… -68.… 251   -4        E    
## # ... with more rows, and 1 more variable: tz_name <chr>

How many data points

Across whole data set using dplyr:

system.time({
out <- flights_tbl %>%
  group_by(year) %>%
  count() %>%
  arrange(year) %>%
  collect()
})
##    user  system elapsed 
##   0.096   0.000  64.988
out
## # A tibble: 23 x 2
## # Groups:   year [23]
##     year       n
##    <int>   <dbl>
##  1    NA      22
##  2  1987 1311826
##  3  1988 5202096
##  4  1989 5041200
##  5  1990 5270893
##  6  1991 5076925
##  7  1992 5092157
##  8  1993 5070501
##  9  1994 5180048
## 10  1995 5327435
## # ... with 13 more rows
out %>% ggplot(aes(x = year, y = n)) + geom_col()
## Warning: Removed 1 rows containing missing values (position_stack).

How many flights from JFK (New York) per year:

system.time({
out <- flights_tbl %>%
  filter(origin == "JFK") %>%
  group_by(year) %>%
  count() %>%
  arrange(year) %>%
  collect()
})
##    user  system elapsed 
##   0.092   0.000  41.842
out
## # A tibble: 22 x 2
## # Groups:   year [22]
##     year     n
##    <int> <dbl>
##  1  1987 12273
##  2  1988 51485
##  3  1989 48606
##  4  1990 47153
##  5  1991 42235
##  6  1992 42454
##  7  1993 41842
##  8  1994 43116
##  9  1995 46187
## 10  1996 44064
## # ... with 12 more rows
out %>% ggplot(aes(x = year, y = n)) + 
    geom_col() +
    labs(title = "Number of flights from JFK")

How many flights to JFK per year:

system.time({
out <- flights_tbl %>%
  filter(dest == "JFK") %>%
  group_by(year) %>%
  count() %>%
  arrange(year) %>%
  collect()
})
##    user  system elapsed 
##   0.068   0.000  31.899
out
## # A tibble: 22 x 2
## # Groups:   year [22]
##     year     n
##    <int> <dbl>
##  1  1987 12317
##  2  1988 52086
##  3  1989 49714
##  4  1990 47689
##  5  1991 42449
##  6  1992 42649
##  7  1993 41895
##  8  1994 43276
##  9  1995 46194
## 10  1996 44064
## # ... with 12 more rows
out %>% ggplot(aes(x = year, y = n)) + 
    geom_col() +
    labs(title = "Number of flights to JFK")

Create a model data set

Suppose we want to fit a linear regression of gain (depdelay - arrdelay) on distance, departure delay, and carriers using data from 2003-2007.

# Filter records and create target variable 'gain'
system.time(
  model_data <- flights_tbl %>%
    filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
    filter(depdelay > 15 & depdelay < 240) %>%
    filter(arrdelay > -60 & arrdelay < 360) %>%
    filter(year >= 2003 & year <= 2007) %>%
    left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
    mutate(gain = depdelay - arrdelay) %>%
    select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain)
)
##    user  system elapsed 
##   0.008   0.000   0.005
model_data
## # Source: spark<?> [?? x 8]
##     year month arrdelay depdelay distance uniquecarrier description   gain
##  * <int> <int>    <int>    <int>    <int> <chr>         <chr>        <int>
##  1  2003     1       23       29      837 UA            United Air …     6
##  2  2003     1       52       18     1835 UA            United Air …   -34
##  3  2003     1       64       82      413 UA            United Air …    18
##  4  2003     1       54       38      413 UA            United Air …   -16
##  5  2003     1        5       17      413 UA            United Air …    12
##  6  2003     1      132      126      413 UA            United Air …    -6
##  7  2003     1        7       29     1723 UA            United Air …    22
##  8  2003     1       62       77     1723 UA            United Air …    15
##  9  2003     1      148      135     1723 UA            United Air …   -13
## 10  2003     1      101       97     1723 UA            United Air …    -4
## # ... with more rows
# Summarize data by carrier
model_data %>%
  group_by(uniquecarrier) %>%
  summarize(description = min(description), gain = mean(gain), 
            distance = mean(distance), depdelay = mean(depdelay)) %>%
  select(description, gain, distance, depdelay) %>%
  arrange(gain)
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
## # Source:     spark<?> [?? x 4]
## # Ordered by: gain
##    description                    gain distance depdelay
##  * <chr>                         <dbl>    <dbl>    <dbl>
##  1 ATA Airlines d/b/a ATA       -3.35     1135.     56.1
##  2 ExpressJet Airlines Inc. (1) -3.03      520.     59.4
##  3 Envoy Air                    -2.54      416.     53.1
##  4 Northwest Airlines Inc.      -2.20      779.     48.5
##  5 Delta Air Lines Inc.         -1.82      868.     50.8
##  6 AirTran Airways Corporation  -1.43      642.     55.0
##  7 Continental Air Lines Inc.   -0.962    1117.     57.0
##  8 American Airlines Inc.       -0.886    1074.     55.5
##  9 Endeavor Air Inc.            -0.639     467.     58.5
## 10 JetBlue Airways              -0.326    1139.     54.1
## # ... with more rows

Train a linear model

Spark MLlib

sparklyr provides interface for the Spark MLlib library for machine learning. RStudio cheat sheet provides a good start point.

Predict time gained or lost in flight as a function of distance, departure delay, and airline carrier.

# Partition the data into training and validation sets
model_partition <- model_data %>% 
  sdf_partition(train = 0.8, valid = 0.2, seed = 5555)

# Fit a linear model
system.time(
  ml1 <- model_partition$train %>%
    ml_linear_regression(gain ~ distance + depdelay + uniquecarrier)
)
##    user  system elapsed 
##   1.056   0.016 321.443
# Summarize the linear model
summary(ml1)
## Deviance Residuals (approximate):
##      Min       1Q   Median       3Q      Max 
## -270.308   -5.579    2.640    9.721  220.210 
## 
## Coefficients:
##      (Intercept)         distance         depdelay uniquecarrier_WN 
##      1.744310610      0.003265998     -0.014662325      0.876124221 
## uniquecarrier_AA uniquecarrier_MQ uniquecarrier_UA uniquecarrier_DL 
##     -5.314241548     -4.870711724     -3.670680336     -5.676217564 
## uniquecarrier_US uniquecarrier_OO uniquecarrier_EV uniquecarrier_NW 
##     -3.603629208     -2.367797953     -1.052889012     -5.783125644 
## uniquecarrier_XE uniquecarrier_CO uniquecarrier_OH uniquecarrier_FL 
##     -5.584317607     -5.513917180     -2.152535205     -4.454803437 
## uniquecarrier_AS uniquecarrier_DH uniquecarrier_YV uniquecarrier_B6 
##     -2.067187427     -0.754864480      0.123395029     -4.945583357 
## uniquecarrier_HP uniquecarrier_9E uniquecarrier_F9 uniquecarrier_TZ 
##     -0.894187818     -2.987736373     -3.885617746     -7.986040267 
## uniquecarrier_HA 
##     -3.132802810 
## 
## R-Squared: 0.02385
## Root Mean Squared Error: 17.74

Assess model performance

Compare the model performance using the validation data.

# Calculate average gains by predicted decile
system.time(
  model_deciles <- lapply(model_partition, function(x) {
    #sdf_predict(ml1, x) %>%
    ml_predict(ml1, x) %>%
      mutate(decile = ntile(desc(prediction), 10)) %>%
      group_by(decile) %>%
      summarize(gain = mean(gain)) %>%
      select(decile, gain) %>%
      collect()
  })
)
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
##    user  system elapsed 
##   0.284   0.004 110.052
model_deciles
## $train
## # A tibble: 10 x 2
##    decile   gain
##     <int>  <dbl>
##  1      1  6.02 
##  2      2  2.48 
##  3      3  1.71 
##  4      4  1.07 
##  5      5  0.337
##  6      6 -0.343
##  7      7 -1.23 
##  8      8 -2.25 
##  9      9 -2.70 
## 10     10 -3.78 
## 
## $valid
## # A tibble: 10 x 2
##    decile   gain
##     <int>  <dbl>
##  1      1  5.97 
##  2      2  2.51 
##  3      3  1.64 
##  4      4  1.06 
##  5      5  0.385
##  6      6 -0.498
##  7      7 -1.15 
##  8      8 -2.27 
##  9      9 -2.71 
## 10     10 -3.83
# Create a summary dataset for plotting
deciles <- rbind(
  data.frame(data = 'train', model_deciles$train),
  data.frame(data = 'valid', model_deciles$valid),
  make.row.names = FALSE
)
deciles
##     data decile       gain
## 1  train      1  6.0197657
## 2  train      2  2.4794602
## 3  train      3  1.7137775
## 4  train      4  1.0696767
## 5  train      5  0.3367499
## 6  train      6 -0.3434529
## 7  train      7 -1.2306961
## 8  train      8 -2.2462305
## 9  train      9 -2.7020457
## 10 train     10 -3.7753330
## 11 valid      1  5.9682166
## 12 valid      2  2.5054524
## 13 valid      3  1.6396994
## 14 valid      4  1.0643781
## 15 valid      5  0.3852696
## 16 valid      6 -0.4975022
## 17 valid      7 -1.1484578
## 18 valid      8 -2.2691283
## 19 valid      9 -2.7098733
## 20 valid     10 -3.8327128
# Plot average gains by predicted decile
deciles %>%
  ggplot(aes(factor(decile), gain, fill = data)) +
  geom_bar(stat = 'identity', position = 'dodge') +
  labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')

Visualize predictions

Compare actual gains to predicted gains for an out of time sample.

# Select data from an out of time sample
data_2008 <- flights_tbl %>%
  filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
  filter(depdelay > 15 & depdelay < 240) %>%
  filter(arrdelay > -60 & arrdelay < 360) %>%
  filter(year == 2008) %>%
  left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
  mutate(gain = depdelay - arrdelay) %>%
  select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain, origin, dest)
data_2008
## # Source: spark<?> [?? x 10]
##     year month arrdelay depdelay distance uniquecarrier description  gain
##  * <int> <int>    <int>    <int>    <int> <chr>         <chr>       <int>
##  1  2008     1        2       19      810 WN            Southwest …    17
##  2  2008     1       34       34      515 WN            Southwest …     0
##  3  2008     1       11       25      688 WN            Southwest …    14
##  4  2008     1       57       67     1591 WN            Southwest …    10
##  5  2008     1       80       94      828 WN            Southwest …    14
##  6  2008     1       15       27     1489 WN            Southwest …    12
##  7  2008     1       16       28      838 WN            Southwest …    12
##  8  2008     1       37       51      220 WN            Southwest …    14
##  9  2008     1       19       32      220 WN            Southwest …    13
## 10  2008     1        6       20      220 WN            Southwest …    14
## # ... with more rows, and 2 more variables: origin <chr>, dest <chr>
# Summarize data by carrier
#carrier <- sdf_predict(ml1, data_2008) %>%
carrier <- ml_predict(ml1, data_2008) %>%
  group_by(description) %>%
  summarize(gain = mean(gain), prediction = mean(prediction), freq = n()) %>%
  filter(freq > 10000) %>%
  collect()
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
carrier
## # A tibble: 18 x 4
##    description                    gain prediction   freq
##    <chr>                         <dbl>      <dbl>  <dbl>
##  1 Endeavor Air Inc.             0.458    -0.596   35039
##  2 SkyWest Airlines Inc.        -1.23     -0.0663  85859
##  3 PSA Airlines Inc.            -2.28      0.410   36958
##  4 United Air Lines Inc.         2.02      0.588   97408
##  5 Frontier Airlines Inc.       -0.398     0.202   14674
##  6 ExpressJet Airlines Inc. (1) -0.330    -2.81    70037
##  7 Southwest Airlines Co.        4.18      4.01   224655
##  8 ExpressJet Airlines Inc.      0.539     1.32    56247
##  9 Continental Air Lines Inc.    2.36     -0.671   61386
## 10 Northwest Airlines Inc.      -2.85     -2.11    49220
## 11 JetBlue Airways              -0.975    -0.577   38257
## 12 AirTran Airways Corporation  -1.92     -1.14    44867
## 13 US Airways Inc.               2.40      0.468   59418
## 14 Envoy Air                    -2.54     -2.50    97086
## 15 Alaska Airlines Inc.          1.48      1.95    23806
## 16 Delta Air Lines Inc.         -1.42     -1.58    69066
## 17 Mesa Airlines Inc.            0.246     2.25    48208
## 18 American Airlines Inc.       -0.688    -0.893  131798
# Plot actual gains and predicted gains by airline carrier
ggplot(carrier, aes(gain, prediction)) + 
  geom_point(alpha = 0.75, color = 'red', shape = 3) +
  geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
  geom_text(aes(label = substr(description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
  labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')

Some carriers make up more time than others in flight, but the differences are relatively small. The average time gains between the best and worst airlines is only six minutes. The best predictor of time gained is not carrier but flight distance. The biggest gains were associated with the longest flights.

Disconnect from Spark

spark_disconnect_all()
## [1] 1