# Un MapReduce Pythonico...
(versione del Jupiter Notebook del corso di Bid Data Analysis del cineca riadattata da Paolo Avogadro)

tramite Hadoop Streaming si puo' fare MapReduce con Python (o altri linguaggi), non si e' piu' legati a Java

Possiamo migliorare ancora le cose?

Che tipo di problemi possono esserci?

Con Hadoop Streaming dobbiamo lavorare direttamente con l'HDFS
* spostare i file di input nell' HDFS
* richiamare i file di output
* questo aumenta la possibilita' di fare errori 

Il debugging puo' essere difficile

* logs via jobtracker
* gli errori sono spesso correlati con Java stacktrace, piuttosto che con il job MapReduce

E' necessario scrivere (almeno) 2 file differenti
 * uno per il mapper
 * uno per il reducer

tutto questo non e' un **modulo** di Python (che sono oggetti semplici e puliti)

Le **classi** di Python sono pero' uno strumento ideale che:

 * potrebbe rendere tutti questi passi piu' semplici 
 * uniformare un job MapReduce in modo che sia chiaro e facilmente debuggabile.

# MRJob 
A more pythonic MapReduce library from Yelp

<img src='https://avatars1.githubusercontent.com/u/49071?v=3&s=400' width=300>

> “Easiest route to Python programs that run on Hadoop”

si installa facilmente (supponendo di avere Python) tramite: 
```bash
pip install mrjob
```

**Running modes** ( sono le modalita' in cui si puo' usare MrJob)
* Consentono di testare il codice in locale senza dover installare Hadoop 
* Oppure fare girare i job su un cluster a propria scelta, per esempio:
    - Si integra con Amazon **Elastic MapReduce** (EMR)
    - il codice e' lo stesso  in locale, su Hadoop o EMR
    - rende sempice fare girare un job sul cloud 

### Come funziona MRJob?

* E' un modulo di Python scritto **on top of Hadoop Streaming (HS)**
    - HS jar apre un sottoprocesso del nostro codice 
    - gli manda l' input via stdin
    - raccoglie i risultati dallo stdout
* funge da wrapper per HDFS per il pre e post processing (se c'e' hadoop)
* costruisce una interfaccia consistente tra tutti gli ambienti supportati 
* in modo automatico serializza/deserializza il flusso dei dati da ogni task 
    - e.g. JSON: json.loads() and json.dumps()

## Facciamo un po' di prove

In [1]:
from mrjob.job import MRJob

Un  ** job ** e' definito da una classe presa dal pacchetto MRJob 

* Contiene metodi che definscono i passi (**step**) di un job di Hadoop
* Uno “step” consiste di un  **mapper**, un **combiner**,  un **reducer**. 
* Tutti questi step sono opzionali, ma ne serve almeno uno (altrimenti non stiamo facendo nulla...)

Proviamo a vedere un template di questa classe:

In [4]:
class myjob(MRJob):                      # eredito la classe da MRjob
    def mioMapper(self, _, line):        # l'argomento  _  e'perche' di spesso la chiave d'ingresso non serve
        pass                             # senza il pass il template da errore
    def mioCombiner(self, key, values):  # combiner esegue sul mapper, prima che lo shuffle sia stato effettuato
        pass
    def mioReducer(self, key, values):   # reducer
        pass

## WordCount
contiamo quante parole ci sono in un testo.

### Mapper

Il metodo mapper() prende una chiave e un valore come argomenti

```python
    def mioMapper(self, _, line):  
        pass
```

* In questo esempio la **key e' ignorata** (in quanto, per esempio, non ci serve sapere quale sia il numero di riga) mentre la riga stessa e' il valore, questo per la forma di default del protocollo

* Questa  "funzione" restituisce (**yield**)  un numero di coppie chiave valore che dipende dall'input (perche' il termine "funzione" e' tra virgolette? vedremo poi)
* ricordiamo quindi che un mapper associa ad **un** unico input, **molte** chiavi-valori come output

### Yield?

**Attenzione**: `yield` e' DIVERSO da `return`: 

 `yield` trasforma una funzione in una **generator function**:

- se chiamo una funzione piu' volte, ogni volta tutti i suoi argomenti saranno "reinizializzati" e non ci sara' "memoria" della chiamata precedente
- Se e' presente uno **yield** l'oggetto creato e' un **generator**, non una *funzione* 
- **yield** a differenza di **return** "congela" il generator, la prossima volta che verra' chiamato ripartira' dal punto dove e' stato chiamato lo **yield**.
- un **generator** ha, di default, varie proprieta', per esempio il metodo `next()`
- Utile: https://docs.python.org/3/tutorial/classes.html

In [1]:
# esempio di generatore che funziona come un iterator
def mioGen():       #  e' praticamente identico ad una funzione.
     i=0
     while i<10:
        i+=1
        yield i     # qui invece e' diverso...
        
a=mioGen()

In [2]:
next(a)

1

cosa succede se cerco di chiamare un'altra volta 'a'?

In [3]:
next(a)

2

In [7]:
a=mioGen()          
i=0
while i < 10:
    i+=1
    print(next(a))

1
2
3
4
5
6
7
8
9
10


In [5]:
print(next(mioGen()))    # qui non e' ancora stato instanziato
print(next(mioGen()))
b=mioGen()               # questo crea una istanza del generatore
c=b                      # questo crea un nuovo NOME dell'istanza del generatore  
d=mioGen()               # questo crea una istanza differente del generatore
print(next(b))
print(next(b))
print(next(c))
print(next(d))

1
1
1
2
3
1


In [23]:
# quando uso un loop in Python su un oggetto-contenitore, uso la funzione iter() 
#e trasformo quel contenitore in un iteratore
stringa = 'abcd' 
#print(next(stringa))
Istringa = iter(stringa)    # crea istanza dell'iteratore
print(next(Istringa))

a


In [24]:
print(next(Istringa))

b


Occhio, un generatore puo' esaurirsi! a quel punto non si puo' piu' chiamare il metodo next.

In [7]:
next(a)

StopIteration: 

In [8]:
def mygen():
    for i in range(1,10):
        yield i, 'valore'          # restituisce 2 oggetti

for chiave, valore in mygen():     # chiamo ripetutamente la stessa 'funzione' 
    print (chiave, valore)         # che da risultati diversi ogni volta: e' un generatore

1 valore
2 valore
3 valore
4 valore
5 valore
6 valore
7 valore
8 valore
9 valore


In [1]:
lista = [1,2,3]
type(lista)

list

In [2]:
type(iter(lista))

list_iterator

Proviamo ora a scrivere un generatore tale che, data una stringa con delle parole, separate da spazi, "emetta" coppie formate da: 
 * primo oggetto emesso, la parola stessa
 * secondo oggetto emesso 1  (ho trovato, in questo punto la data parola)

In [3]:
linea = 'nel mezzo del cammin i i nel nel nel nel'
def emetti(testo):
    for parola in testo.split():
        yield parola, 1

for chiave, valore in emetti(linea):  # chiamo ripetutamente la stessa funzione
    print (chiave, valore)            # che con yield e' un generatore e da risultati diversi

nel 1
mezzo 1
del 1
cammin 1
i 1
i 1
nel 1
nel 1
nel 1
nel 1


### Reducer

Il metodo usato come **riduttore()** prende come argomenti (a parte self):
* una **chiave** 
* un **iteratore** di valori  (ricordiamo che solo oggetti con la medesima chiave intermedia arrivano ai rducer)

```python
    def mioRiduttore(self, key, values):
        pass
```

* Anche in questo caso il numero di coppie chiave-valore emesse dipende da come e' fatto il reducer stesso e da quali argomenti riceve

* Per esempio somma i valori di ogni chiave    

* anche il reducer puo' usare yield 

* un **iteratore** assomiglia ad un generatore ma e' un po' piu' generale: basta prendere un oggetto composto (p.es. una lista) e usare ```iter(miaLista) ```. A questo punto potremo chiamare "chiamare" i vari componenti tramite il metodo ```next()``` come nel caso dei generatori.

## Scriviamo il nostro primo job

In [2]:
# Configuriamo alcune variabili d'ambiente di  LINUX

mydir = "."                
%env mydir = $mydir
myinput = "testo.txt"                # nome del testo che vogliamo processare
%env myinput = $myinput    
myscript = mydir + "/wordcount.py"   # script che voglio usare
%env myscript = $myscript

%system mkdir -p $mydir           
%env myoutput $mydir/out.txt
%env mylog $mydir/out.log

env: mydir=.
env: myinput=testo.txt
env: myscript=./wordcount.py
env: myoutput=./out.txt
env: mylog=./out.log


Scriviamo il file con il job **wordcount**, in modo che:
* prenda un file di testo in ingresso
* emetta come risultato il numero di occorrenze di ciascuna parola (considerando parole, per cui l'unica differenza e' la maiuscola come parole identiche) 
* conti il numero di volte che una parola appare all'interno del testo

In [None]:
xxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxx

In [3]:
%%writefile mioScript.py             

from mrjob.job import MRJob
class MRWordCount(MRJob):             # eredita classe da MRJob 
    """ Wordcount with MapReduce in a pythonic way"""

    def mapper(self, key, line):
        for word in line.split():    # prende la linea, la divide in parole separate da spazi e ne crea una lista
             yield word.lower(), 1      # prende le parole della lista e con .lower rende tutto lowercase; restituisce nello stdout una parola e l' 1

                # Qui MRjob sta facendo del lavoro: prende tutte i valori intermedi e li mette insieme
                # in un iteratore che viene passato al reducer!
                
                
    def reducer(self, word, occurrences):  # prende lo stdin, le parole emesse dal mapper
        yield word, sum(occurrences)       # ricorda che il modo in cui e' ricevuto e'  (parola, (1,1,1,1,1)) 

if __name__ == '__main__':                 # queste due linee sono fondamentali 
    MRWordCount.run()                      # senza non succede nulla, e' solo una classe: cosi' esegue

Writing mioScript.py


## Nota
tramite MRJob, e il fatto che si usino dei generatori,
non dobbiamo controllare all'intero del **reducer** quando il **valore** sta cambiando

## I/O
Si pue' passare l'input da processare come STDIN, pero' in ogni caso mrjob lo mettera' in un *file* all'inizio.
Esempio di come dare in pasto un file da processare:

```bash
$ python my_job.py < input.txt
```
Si possono passare molteplici file di input usando il carattere -, per esempio

```bash
$ python my_job.py input1.txt input2.txt - < input3.txt
```
Di default l'output va nello stdout

```bash
$ python my_job.py input.txt
```


In [4]:
# Eseguiamo il nostro primo job di MrJob
! python $myscript $myinput 1> $myoutput 2> $mylog  

In [5]:
%%bash
cat $myoutput

In [23]:
%%bash
more $myoutput

Proviamo a fare un singolo job che, dato un file di testo:
    - calcoli il numero di parole
    - calcoli il numero di righe
    - calcoli il numero di lettere

Tutto questo deve avvenire con un singolo mapper e un singolo reducer, come posso farlo?

In [25]:
%%writefile summerize.py  

from mrjob.job import MRJob
class riassunto(MRJob):
    def mapper(self,_,linea):
        yield 'caratteri', len(linea)
        yield 'parole', len(linea.split())
        yield 'righe', 1
        
    def reducer(self, chiave, valore):
        yield chiave, sum(valore)      

if __name__ == '__main__':              
    riassunto.run()                    # metodo .run() e' necessario


Overwriting summerize.py


In [26]:
!python summerize.py lungo.txt  1> riassunto.txt 2> errori.txt

Esempio di un **template** (pensato per essere eseguibile)

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" MapReduce easily with Python """

from mrjob.job import MRJob
from mrjob.step import MRStep

class job(MRJob):
    def mapper(self, _, line):
        pass
    def reducer(self, key, line):
        pass
    def steps(self):
        return [                    # versione ok, quella senza importare MRStep e' DEPRECATA
            MRStep(mapper=self.mapper, reducer=self.reducer)
        ]

if __name__ == "__main__":
    job.run()
    
```

con MRStep si possono gestire i vari passi

# Due parole sui combiner e i reducer
* Un combiner, assomiglia ad un reducer, ma prende i dati direttamente da **un solo** mapper
* puo' essere pensato come un modo per ridurre la quantita' di dati che deve poi circolare tra i nodi
* solo alcuni tipi di funzioni possono essere usate in un combiner
   - funzioni commutative e  associative
   - deve emettere coppie chive valore che vadano bene per il reducer

## Esercizio

Costruiamo un job <Mapreduce> tale che 
- ci dica quante volte ognuna delle vocali appare in un testo
- ci dica il numero TOTALE di vocali nel testo

In [13]:
%%writefile cercaVocali.py  

#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" MapReduce easily with Python """

from mrjob.job import MRJob
from mrjob.step import MRStep

class riassuntoVocali(MRJob):
    def mapper(self,_,linea):
        vocali=('a','i','u','e','o')   # tupla di vocali
        for lettera in range (0,len(linea)):  # gira sulle lettere della linea
            if linea[lettera].lower() in vocali:
                yield linea[lettera].lower(), 1
                yield 'numero vocali',1
                
    def reducer(self, chiave, valore):
        yield chiave, sum(valore)      

if __name__ == '__main__':              
    riassuntoVocali.run() 

Overwriting cercaVocali.py


In [8]:
!python cercaVocali.py testo.txt  1> riassunto.txt 2> errori.txt

## Running Modes:

Di default MRJob gira in locale, in un singolo processo di Python.
Il modo in cui gira puo' essere cambiato tramite i **running modes**,
scelti attraverso l'opzione `-r/--runner`


```bash
-r inline, -r local, -r hadoop, or -r emr
```

E' inoltre possibile usare l'opzione `--verbose` per mostrare tutti i passi

quindi basta usare  `-r hadoop` per fare girare il job su hadoop

<small>Nota: La *magic* `capture` e' un altro modo che possiamo usare per catturare l'output.</small>

### Protocolli / Protocols

http://mrjob.readthedocs.org/en/latest/guides/writing-mrjobs.html#protocols

L'idea e' che si possano gestire in modo automatico molte caratteristiche,
in particolare di formato (sia in ingresso che in uscita, ed eventualmente 
tra uno step e il successivo)



MRJob ha molte utili opzioni

anche se alcune di loro possono risultare costose (in termini di tempistiche) quando si fa un job pesante

Per esempio e' possibile definire come sono i formati tra un passo e l'altro di MapReduce

In [None]:
class MyMRJob(mrjob.job.MRJob):

    # these are the defaults
    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol     # protocollo standard di input che legge le righe
    INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol      # protocollo di comunicazione tra, per es. mapper e reducer 
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol        # protocollo di uscita


In [15]:
%%writefile protocol.py
from mrjob.job import MRJob
from mrjob.protocol import PickleProtocol   # importiamo i protocolli pickle

class MRWordCount(MRJob):

    # Optimization on internal protocols
    INTERNAL_PROTOCOL = PickleProtocol      # i dati sono trasferiti compattati tra mapper e reducer
    OUTPUT_PROTOCOL = PickleProtocol        # i dati in uscita sono comunque in formato pickle
    
    def mapper(self, key, line):
        for word in line.split( ):
             yield word.lower(), 1

                
    def reducer(self, word, occurrences):
        yield word, sum(occurrences)

if __name__ == '__main__':
    MRWordCount.run()

Overwriting protocol.py


In [16]:
! python protocol.py /data/lectures/data/books/twolines.txt 2> /dev/null

## Gestire i vari step di un processo MapReduce

mrjob puo' essere configurato per fare diversi step 

Per ogni step e' necessario specificare quale parte deve essere eseguita 
e il metodo da usare all'interno della classe, per esempio: 


In [17]:
def steps(self):
    return [
        MRStep(
            mapper=self.mapper_get_words,          # indico quale e' il nome del mapper
            combiner=self.combiner_count_words,    # combiner  
            reducer=self.reducer_count_words),     # reducer
        MRStep(reducer=self.reducer_find_max_word) # riduttore finale
    ]

Proviamo a vedere un processo mapreduce che ci fa vedere le parole piu' frequenti.
Pensiamo ad un processo a piu' passi in cui il risultato finale sia la parola piu' frequente all'interno di un testo
 


In [11]:
%%writefile parolaFrequente.py


from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()

Overwriting parolaFrequente.py


In [12]:
!python parolaFrequente.py divina.txt 2> problemi.txt

4262	"e"


# Riassunto di MrJob

 [documentazione della versione piu' recente](http://mrjob.readthedocs.org/en/latest/).

(fatta bene senza diventare troppo complicata)

Esistono molte opzioni e possibilita' avanzate da scoprire.

<small>
Nota : Gli sviluppatori possono aiutare: https://github.com/Yelp/mrjob/issues/1142
</small>



Ad majora:

* con MrJob non e' possibile connettersi in **remoto** ad un cluster Hadoop. 
    - (questo perche' Hadoop non consente che si inviino dei job dall'esterno (classi o eseguibili).
* d'altra parte ad EMR su Amazon si puo' accedere anche dal proprio laptop.
    - Amazon ha creato il seguente [boto api](http://boto.readthedocs.org/en/latest/ref/emr.html) proprio per questo problema

** Vantaggi **
* Piu' documentazione che qualunque altra libreria o framework
* Basta un codice contenente una singola classe per ogni job MapReduce
    * Map e Reduce sono dei singoli metodi
    * Molto pulito e semplice
* Configurazioni avanzate
    * steps multipli
    * i comandi da linea di comando possono essere gestiti all'interno del codice Python(vedere docs)
* Facile la gestione di input/output 
    * Non e' necessario copiare i dati con HDFS
* Gli errori e i warning vanno nell'outpur dello script 
* **Si puo' cambiare l'ambiente (HS, AWS,...)senza cambiare il codice changing the code...!**

**Svantaggi**

* il livello di accesso alle Hadoop APIs e' inferiore (rispetto HS per esmpio)
    - Notevoli concorrenti Dumbo e Pydoop
    - Altre librerie possono essere piu' veloci  can be faster if you use typedbytes

### Comparison
<img src='http://blog.cloudera.com/wp-content/uploads/2013/01/features.png'>

### Performance
<img src='http://blog.cloudera.com/wp-content/uploads/2013/01/performance.png'>

da: http://blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/

*nota finale*: 
> Open source is a great thing

I forum possono aiutare
https://github.com/Yelp/mrjob/issues/1142

# Fine Capitolo