Banner

 

18 - Cluster computing

Answers to exercises

1.
Write a MapReduce function for the following situations. In each case, write the corresponding R code to understand how MapReduce and conventional programming differ.
1a.
Compute the square and cube of the numbers in the range 1 to 25. Display the results in a data frame.
# R
# create a list of 25 integers
ints <- 1:25
result <- sapply(ints,function(x) x^2)
result
result <- sapply(ints,function(x) x^3)
result

# MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
# load a list of 25 integers into HDFS 
hdfs.ints = to.dfs(1:25)
# mapper for the key-value pairs to compute squares
mapper <- function(k,v) {
  key <- v
  value <- c(key^2,key^3)
  keyval(key,value)
}
# run MapReduce 
out = mapreduce(input = hdfs.ints, map = mapper)
# convert to a data frame
df = as.data.frame(from.dfs(out))
# reshape with n, n^2, n^3 one row
#add identifiers for each row as they are consecutively the square and cube
df$powers <- c('n^2','n^3')
output <- cast(df,key ~ powers,value="val")
head(output)
1b.
Using the average monthly temperatures for New York’s Central Park, compute the maximum, mean, and average temperature in Celsius for each month.
# R
library(readr)
url <-  "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
t$C = round((t$temperature - 32)*5/9,0)
a1 <-  aggregate(t$C,by=list(t$month),FUN=max)
colnames(a1) = c('month', 'value')
a1$measure = 'max'
a2 <-  aggregate(t$C,by=list(t$month),FUN=mean)
colnames(a2) = c('month', 'value')
a2$value = round(a2$value,1)
a2$measure = 'mean'
a3 <-  aggregate(t$C,by=list(t$month),FUN=min)
colnames(a3) = c('month', 'value')
a3$measure = 'min'
# stack the results
stack <-  rbind(a1,a2,a3) 
library(reshape)
# reshape with month, max, mean, min in one row
stats <-  cast(stack,month ~ measure,value="value")
stats
  
# MapReduce  
require(rmr2)
rmr.options(backend = "local") # local or hadoop
require(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
# save temperature in hdfs file
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures for each month
mapper <- function(k,v) {
key <- v$month
value <- round((v$temperature - 32)*5/9,0) # temperature in Celsisus
keyval(key,value)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k #month
value <- c(max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('max','mean','min')
# reshape with year, max, mean, min in one row
stats <- cast(df,key ~ measure,value="val")
stats
1c.
Using the average monthly temperatures for New York's Central Park, compute the max, mean, and min for August.
# R
library(readr)
url <-  "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
t8 = t[t$month==8,]
max(t8$temperature)
mean(t8$temperature)
min(t8$temperature) # MapReduce
library(rmr2)
rmr.options(backend = "local") # local or hadoop
library(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')# save temperature in hdfs file
# subest before passing to mapper
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures
mapper <- function(k,v) {
keyval("August",subset(v$temperature,v$month==8))
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # month
value <- c(length(v),max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('observations','max','mean','min')
df
1d.
Using the electricity price data, compute the average hourly cost.
#R
require(lubridate)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e <- read.table(url, header=T,sep=',')
e$hour <- hour(e$timestamp)
e$month <- month(e$timestamp)
a <- aggregate(e$cost,by=list(e$hour),FUN=mean)
colnames(a) <- c('hour','average cost')
a# MapReduce
require(rmr2)
require(lubridate)
require(sqldf)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e2 <- read.table(url, header=T,sep=',')
hdfs.temp <- to.dfs(data.frame(e2))
mapper <- function(k,v) {
hour <- hour(v$timestamp)
keyval(hour,v$cost)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # time
value <- mean(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
colnames(df) <- c('hour','average cost')
sqldf('SELECT * FROM df ORDER BY hour;')
1e.
Read the national GDP file, which records GDP in millions, and count how many countries have a GDP greater than or less than 10,000 million.
# R
url <-  "http://people.terry.uga.edu/rwatson/data/GDP.csv"
g <-  read_delim(url, delim=',')
g$level <-  ifelse(g$GDP > 10000,'high','low')
table(g$level)

# MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/GDP.csv"
g <- read_delim(url, delim=',')
hdfs.temp <- to.dfs(data.frame(g))
mapper <- function(k,v) {
key <- ifelse(v$GDP > 10000,'high','low')
keyval(key,1)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # category
value <- length(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
as.data.frame(from.dfs(out))
This page is part of the promotional and support material for Data Management (open edition) by Richard T. Watson
For questions and comments please contact the author
Date revised: 10-Dec-2021