# R # create a list of 25 integers ints <- 1:25 result <- sapply(ints,function(x) x^2) result result <- sapply(ints,function(x) x^3) result # MapReduce require(rmr2) rmr.options(backend = "local") # local or hadoop # load a list of 25 integers into HDFS hdfs.ints = to.dfs(1:25) # mapper for the key-value pairs to compute squares mapper <- function(k,v) { key <- v value <- c(key^2,key^3) keyval(key,value) } # run MapReduce out = mapreduce(input = hdfs.ints, map = mapper) # convert to a data frame df = as.data.frame(from.dfs(out)) # reshape with n, n^2, n^3 one row #add identifiers for each row as they are consecutively the square and cube df$powers <- c('n^2','n^3') output <- cast(df,key ~ powers,value="val") head(output)
# R library(readr) url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt" t <- read_delim(url, delim = ',') t$C = round((t$temperature - 32)*5/9,0) a1 <- aggregate(t$C,by=list(t$month),FUN=max) colnames(a1) = c('month', 'value') a1$measure = 'max' a2 <- aggregate(t$C,by=list(t$month),FUN=mean) colnames(a2) = c('month', 'value') a2$value = round(a2$value,1) a2$measure = 'mean' a3 <- aggregate(t$C,by=list(t$month),FUN=min) colnames(a3) = c('month', 'value') a3$measure = 'min' # stack the results stack <- rbind(a1,a2,a3) library(reshape) # reshape with month, max, mean, min in one row stats <- cast(stack,month ~ measure,value="value") stats # MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
require(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')
# save temperature in hdfs file
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures for each month
mapper <- function(k,v) {
key <- v$month
value <- round((v$temperature - 32)*5/9,0) # temperature in Celsisus
keyval(key,value)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k #month
value <- c(max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('max','mean','min')
# reshape with year, max, mean, min in one row
stats <- cast(df,key ~ measure,value="val")
stats
# R library(readr) url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt" t <- read_delim(url, delim = ',')
t8 = t[t$month==8,]
max(t8$temperature)
mean(t8$temperature)
min(t8$temperature) # MapReduce
library(rmr2)
rmr.options(backend = "local") # local or hadoop
library(reshape)
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/centralparktemps.txt"
t <- read_delim(url, delim = ',')# save temperature in hdfs file
# subest before passing to mapper
hdfs.temp <- to.dfs(data.frame(t))
# mapper for computing temperature measures
mapper <- function(k,v) {
keyval("August",subset(v$temperature,v$month==8))
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # month
value <- c(length(v),max(v),round(mean(v),1),min(v)) #v is list of values
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
df$measure <- c('observations','max','mean','min')
df
#R
require(lubridate)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e <- read.table(url, header=T,sep=',')
e$hour <- hour(e$timestamp)
e$month <- month(e$timestamp)
a <- aggregate(e$cost,by=list(e$hour),FUN=mean)
colnames(a) <- c('hour','average cost')
a# MapReduce
require(rmr2)
require(lubridate)
require(sqldf)
url <- "http://people.terry.uga.edu/rwatson/data/electricityprices2010_14.csv"
e2 <- read.table(url, header=T,sep=',')
hdfs.temp <- to.dfs(data.frame(e2))
mapper <- function(k,v) {
hour <- hour(v$timestamp)
keyval(hour,v$cost)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # time
value <- mean(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
df = as.data.frame(from.dfs(out))
# add measure identifiers
colnames(df) <- c('hour','average cost')
sqldf('SELECT * FROM df ORDER BY hour;')
# R url <- "http://people.terry.uga.edu/rwatson/data/GDP.csv" g <- read_delim(url, delim=',') g$level <- ifelse(g$GDP > 10000,'high','low') table(g$level) # MapReduce
require(rmr2)
rmr.options(backend = "local") # local or hadoop
library(readr)
url <- "http://people.terry.uga.edu/rwatson/data/GDP.csv"
g <- read_delim(url, delim=',')
hdfs.temp <- to.dfs(data.frame(g))
mapper <- function(k,v) {
key <- ifelse(v$GDP > 10000,'high','low')
keyval(key,1)
}
#reducer to report stats
reducer <- function(k,v) {
key <- k # category
value <- length(v)
keyval(key,value)
}
out = mapreduce(
input = hdfs.temp,
map = mapper,
reduce = reducer)
# convert to a frame
as.data.frame(from.dfs(out))
This page is part of the promotional and support
material for Data Management (open edition) by Richard T. Watson For questions and comments please contact the author |