# R
# create a list of 25 integers
ints <- 1:25
result <- sapply(ints,function(x) x^2)
result
result <- sapply(ints,function(x) x^3)
result
# MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
# load a list of 25 integers into HDFS
hdfs.ints = to.dfs(1:25)
# mapper for the key-value pairs to compute squares
mapper <- function(k,v) {
key <- v
value <- c(key^2,key^3)
keyval(key,value)
}
# run MapReduce
out = mapreduce(input = hdfs.ints, map = mapper)
# convert to a data frame
df = as.data.frame(from.dfs(out))
# reshape with n, n^2, n^3 one row
#add identifiers for each row as they are consecutively the square and cube
df$powers <- c('n^2','n^3')
output <- cast(df,key ~ powers,value="val")
head(output)
# R
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
t$C = round((t$temperature - 32)*5/9,0)
a1 <- aggregate(t$C,by=list(t$month),FUN=max)
colnames(a1) = c('month', 'value')
a1$measure = 'max'
a2 <- aggregate(t$C,by=list(t$month),FUN=mean)
colnames(a2) = c('month', 'value')
a2$value = round(a2$value,1)
a2$measure = 'mean'
a3 <- aggregate(t$C,by=list(t$month),FUN=min)
colnames(a3) = c('month', 'value')
a3$measure = 'min'
# stack the results
stack <- rbind(a1,a2,a3)
library(reshape)
# reshape with month, max, mean, min in one row
stats <- cast(stack,month ~ measure,value="value")
stats
# MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
require(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
# save temperature in hdfs file
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures for each month
mapper <- function(k,v) {
key <- v$month
value <- round((v$temperature - 32)*5/9,0) # temperature in Celsisus
keyval(key,value)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k #month
value <- c(max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('max','mean','min')
# reshape with year, max, mean, min in one row
stats <- cast(df,key ~ measure,value="val")
stats
# R library(readr) url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt" t <- read_delim(url, delim = ',')
t8 = t[t$month==8,]
max(t8$temperature)
mean(t8$temperature)
min(t8$temperature) # MapReduce
library(rmr2)
rmr.options(backend = "local") # local or hadoop
library(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')# save temperature in hdfs file
# subest before passing to mapper
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures
mapper <- function(k,v) {
keyval("August",subset(v$temperature,v$month==8))
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # month
value <- c(length(v),max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('observations','max','mean','min')
df
#R
require(lubridate)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e <- read.table(url, header=T,sep=',')
e$hour <- hour(e$timestamp)
e$month <- month(e$timestamp)
a <- aggregate(e$cost,by=list(e$hour),FUN=mean)
colnames(a) <- c('hour','average cost')
a# MapReduce
require(rmr2)
require(lubridate)
require(sqldf)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e2 <- read.table(url, header=T,sep=',')
hdfs.temp <- to.dfs(data.frame(e2))
mapper <- function(k,v) {
hour <- hour(v$timestamp)
keyval(hour,v$cost)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # time
value <- mean(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
colnames(df) <- c('hour','average cost')
sqldf('SELECT * FROM df ORDER BY hour;')
# R url <- "http://people.terry.uga.edu/rwatson/data/GDP.csv" g <- read_delim(url, delim=',') g$level <- ifelse(g$GDP > 10000,'high','low') table(g$level) # MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/GDP.csv"
g <- read_delim(url, delim=',')
hdfs.temp <- to.dfs(data.frame(g))
mapper <- function(k,v) {
key <- ifelse(v$GDP > 10000,'high','low')
keyval(key,1)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # category
value <- length(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
as.data.frame(from.dfs(out))
| This page is part of the promotional and support
material for Data Management (open edition) by Richard T. Watson For questions and comments please contact the author |