# Hadoop
(introduzione tramite Hadoop streaming, notebook del CINECA Big DATA 2015 versione modificata da Paolo Avogadro)

<center>
<img src='http://www.opensourceforu.efytimes.com/wp-content/uploads/2012/03/hadoop-database-590x321.jpg'>
</center>

**MapReduce** e' un paradigma completamente differente rispetto a MPI o openMP

* MapReduce viene usato per risolvere un **sottoinsieme di problemi** parallelizzabili  
    - funziona per aggirare il collo di bottiglia dovuto **all'ingestione dei dati da disco** (analisi grandi moli di dati) 
* Con il parallelismo tradizionale i **dati** vengono spostati verso il nodo computazionale 
    - Map/reduce fa il contrario, definisce quali nodi calcolano **in funzione della posizione** dei dati

* i Dati vengono suddivisi (sharding) in piccoli pezzi (128 Mb per chunk per esempio) 
    - e sono stoccati sui vari nodi computazionali 

**MapReduce** e' un paradigma di programmazione che consente una scalabilita' **massiva** su migliaia di nodi 

La sua implementazione open source e' **Hadoop** (Hadoop 1.0 aveva un limite di 4000 nodi, ora questo limite e' stato  superato).


***HDFS*** e' una parte fondamentale per Hadoop (e per Spark, che ha scopi simili ma usa maggiormente la memoria invece che il disco)

* ridistribuisce i pezzi di dato ( tra i vari nodi )
* e' necessario per rendere efficenti le applicazioni di tipo mapreduce
* gestisce i nodi
* gestisce i dati in modo che siano sicuri (p.es. 3 repilche)
* gestisce la comunicazione tra nodi (p.es. la fase di shuffle di Mapreduce)

# una guida di Hadoop: da Java verso Python

1. un'occhiata veloce ad Hadoop Java
2. Comprendere come lanciare una job Hadoop Streaming 
5. Simuliamo Hadoop streaming con le "pipe" | di bash
6. Lanciamo Hadoop Streaming con Python

# Conteggio delle parole di un testo
E' l'`Hello World`' di MapReduce

Questo e' l'esempio che e' stato portato anche nell'articolo originale di Mapreduce

<img src='http://www.glennklockwood.com/data-intensive/hadoop/wordcount-schematic.png'
width='700'>
<small>***Moby Dick*** </small>

### Come funziona
* Il passo di **MAP**  prende il testo e lo converte in coppie **chiave/valore intermedie**
    - Ogni parola del testo diventa una chiave
    - TUTTE le chiavi (le parole del testo) hanno come valore 1


* Il passo di **REDUCE** accorpera' le chiavi duplicate: 
    - Si sommano tutti i valori associati alla **medesima chiave** 
    - l'**Output** diventa qundi una lista di coppie chiavi valore, in cui tutte le chiavi sono diverse tra loro
    - il valore corrispondente ad ogni chiave (parola) e' il numero di volte che la chiave stessa appare nel testo 

<center>
<img src='http://disco.readthedocs.org/en/latest/_images/map_shuffle_reduce.png' width=800>
</center>

### Funzione Map:
processa i dati e genera un insieme (bag) di coppie **chiave/valore** intermedie.


### Reduce function:
unisce tutti i **valori intermedi** associati con la **medesima chiave intermedia**.

## Nel dettaglio

Supponiamo che il file sia: 
```
Hello World Bye World
Hello Hadoop Goodbye Hadoop
```

La funzione di map legge ad una ad una le parole ed emette: 
```
(Hello, 1)
(World, 1)
(Bye, 1)
(World, 1)

(Hello, 1)
(Hadoop, 1)
(Goodbye, 1)
(Hadoop, 1)
```

La fase di shuffle crea una lista di valori associati ad ogni chiave 
```
(Bye, (1))
(Goodbye, (1))
(Hadoop, (1, 1))
(Hello, (1, 1))
(World, (1, 1))
```

La funzione di reduce somma i numeri della lista per ogni chiave ed emette coppie (parola, conteggio)
```
(Bye, 1)
(Goodbye, 1)
(Hadoop, 2)
(Hello, 2)
(World, 2)
```

## Ecco come farlo in Java!
(il linguaggio nativo di Hadoop)

``` Java
// Imports
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.*

// Create JAVA class
public class WordCount {
```

``` Java
//Mapper function
  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        output.collect(word, one);
      }
    }
  }
```

``` Java
//Reducer function
  public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
    
```

<small>
``` Java
//Main function
  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);
  }
```
</small>

# Hadoop ottenuto tramite le `pipe` di Unix

Prendiamo il testo "The prince" di Machiavelli (in inglese, non perche' anglofoni,
ma perche' ci sono meno problemi con gli accenti!).
 
**Utile**: provate a fare la stessa cosa con il vostro notebook

In [6]:
%%bash
pwd

/d/paoloD/Bicocca/CalcoloParalleloCorso/slidesHadoop/notebook2016


In [6]:
%%bash
# MapRedue sfruttando le  | di bash
 head -100 tmp/the_prince.txt  | awk ' {for (i=1;i<=NF;i++) {print $i}}' | sort | uniq -c
# head -100 tmp/cancella.txt  | awk ' {for (i=1;i<=NF;i++) {print $i}}' | sort | uniq -c


      1 '
      1 (OCT
      1 *
      1 *>
      2 ;
      1 ^943
      1 1469
      1 1527
      1 1903,
      1 1909
      1 1921.
      1 22,
      1 3,
      1 6
      4 a
      1 account
      1 accuse
      1 advantage
      2 advice
      1 aiid
      2 all
      1 always
      1 an
     12 and
      1 AND
      1 and,
      1 appreciation
      1 arts
      3 as
      1 Author's
      1 avowed
      2 been
      1 BOMBAY
      1 Born,
      1 brief
      1 but
      4 by
      3 BY
      1 CALCUTTA
      1 CAPETOWN
      1 City
      1 classical
      1 Classics
      1 Classics'
      1 could
      1 criticised
      1 defamed
      1 deliberate
      1 did
      1 Died,
      1 EDINBURGH
      1 edition
      1 elsewhere
      1 endeavour
      1 England
      2 English
      1 ENGLISH
      1 enslave
      1 Essay
      1 ever
      1 fault.
      1 first
      3 Florence
      1 Florentine
      1 for
      1 For
      1 free
      1 Garden
      1 GLASGOW
      1 good
   

### Cosa abbiamo fatto?

vediamo i vari comandi:

1.  `head -100 tmp/the_prince.txt  `

1.  `awk ' {for (i=1;i<NF;i++) {print $i}}' `

1.  `sort` 

1.  `uniq -c`

**INPUT STREAM**
`head -100 tmp/the_prince.txt`

**MAPPER**
`awk ' {for (i=1;i<NF;i++) {print $i}}'`

**SHUFFLE**
`sort`

**REDUCER**
`uniq -c`

**OUTPUT STREAM**
`<STDOUT>`

passo passo

In [8]:
# lo STREAMING del file  
! head -100 tmp/the_prince.txt
# prendo le prime 10 righe per vedere cosa succede

World's Classics 



XLIII 
THE PRINCE 

BY 

NICCOL6 MACHIAVELLI 



THE PRINCE 



BY 



NICCOL6 MACHIAVELLI 



TRANSLATED INTO ENGLISH BY 

LUIGI RICCI 




HUMPHREY MILFORD 

OXFORD UNIVERSITY PRESS 

LONDON EDINBURGH GLASGOW 

NEW YORK TORONTO MELBOURNE CAPETOWN 

BOMBAY CALCUTTA AND MADRAS 



NICCOLO MACHIAVELLI 

Born, Florence May 3, 1469 

Died, Florence June 22, 1527 

The present translation of Machiavelli's * Prince ' was 
first published in 'The World's Classics' in 1903, and 
reprinted in 1909 and 1921. 




(OCT I 6 ^943 



Printed in England by the Garden City Press, Letchworth. 



PREFACE 

*> 

OF all Machiavelli's works The Prince is undoubtedly 
the greatest ; aiid a new English edition of it is 
likely to he welcome to all those who have not the 
advantage of reading it in the classical Italian 
original. 

For a true appreciation of Machiavelli, impossible 
in a brief Preface, I must refer the English reader 
to Macaulay's Essay on the Italian historian and 


In [9]:
%%bash
# MAPPING
 head -5 tmp/the_prince.txt  | awk ' {for (i=1;i<=NF;i++) {print $i}}' 
 

World's
Classics
XLIII


In [8]:
%%bash
# SHUFFLING mettiamo le cose con lo stesso nome una dietro l'altra (in Mapreduce sono mandate allo stesso reducer)
head -100 tmp/the_prince.txt  | awk ' {for (i=1;i<=NF;i++) {print $i}}' | sort 

'
(OCT
*
*>
;
;
^943
1469
1527
1903,
1909
1921.
22,
3,
6
a
a
a
a
account
accuse
advantage
advice
advice
aiid
all
all
always
an
and
and
and
and
and
and
and
and
and
and
and
and
AND
and,
appreciation
arts
as
as
as
Author's
avowed
been
been
BOMBAY
Born,
brief
but
by
by
by
by
BY
BY
BY
CALCUTTA
CAPETOWN
City
classical
Classics
Classics'
could
criticised
defamed
deliberate
did
Died,
EDINBURGH
edition
elsewhere
endeavour
England
English
English
ENGLISH
enslave
Essay
ever
fault.
first
Florence
Florence
Florence
Florentine
for
For
free
Garden
GLASGOW
good
government,
great
great
greatest
had
have
have
have
he
he
he
he
he
heeded,
helping
him
his
his
his
his
his
his
his
his
his
historian
how
HUMPHREY
I
I
ideas
imagined.
impossible
in
in
in
in
in
in
in
in
in
in
In
indefatigable
ingratitude
INTO
inventor
is
is
is
it
it
it
it
Italian
Italian
Italy
June
Letchworth.
liberal
life
likely
listened
LONDON
lost
LUIGI
Macaulay's
MACHIAVELLI
MACHIAVELLI
MACHIAVELLI
Machiavelli,
Machiavelli's
Machiavelli's
Mac

In [11]:
%%bash
# REDUCER
head -100 tmp/the_prince.txt  | awk ' {for (i=1;i<=NF;i++) {print $i}}' |sort| uniq -c |sort

      1 '
      1 (OCT
      1 *
      1 *>
      1 ^943
      1 1469
      1 1527
      1 1903,
      1 1909
      1 1921.
      1 22,
      1 3,
      1 6
      1 account
      1 accuse
      1 advantage
      1 aiid
      1 always
      1 an
      1 AND
      1 and,
      1 appreciation
      1 arts
      1 Author's
      1 avowed
      1 BOMBAY
      1 Born,
      1 brief
      1 but
      1 CALCUTTA
      1 CAPETOWN
      1 City
      1 classical
      1 Classics
      1 Classics'
      1 could
      1 criticised
      1 defamed
      1 deliberate
      1 did
      1 Died,
      1 EDINBURGH
      1 edition
      1 elsewhere
      1 endeavour
      1 England
      1 ENGLISH
      1 enslave
      1 Essay
      1 ever
      1 fault.
      1 first
      1 Florentine
      1 for
      1 For
      1 free
      1 Garden
      1 GLASGOW
      1 good
      1 government,
      1 greatest
      1 had
      1 heeded,
      1 helping
      1 him
      1 historian
      1 how
      1 HUMPHREY
 

### Considerazioni riguardo all'uso delle *pipe* rispetto a MapReduce

* i passi sono seriali
* il file non e' distribuito su vari nodi...
* ...perche' c'e' un singolo nodo!
* singolo mapper
* singolo reducer
* e' possibile aggiungere un Combiner?

# Hadoop streaming
### Concetti e meccanismi

Hadoop streaming e' una utility che: 

* e' fornita in bundle con la distribuzione di Hadoop
* consente di creare e fare girare dei job Map/Reduce  
    - con **QUALSIASI**  eseguibile o script come mapper e/o reducer (non devo piu' scrivere i job map/reduce in JAVA)

I passi da eseguire: 

* Creare un job Map/Reduce 
* Lanciare il job in un cluster 
* Monitorare i progressi del job fino a completamento
* prendere i vari risultati nella dir

### Perche'?

Uno degli aspetti meno piacevoli di Hadoop (dal punto di vista degli utilizzatori di HPC) e' che e' scritto in *Java*.

* Java non e' stato originariamente pensato per essere un linguaggio high-performance
* Gli esperti di dominio fanno fatica ad imparare Java 

Questo e' il motivo per cui Hadoop consente di scrivere le funzioni map/reduce in qualunque linguaggio e utilizzarle tramite  
Hadoop Streaming 

* Si puo' qundi trasformare uno script in Python, Bash, Perl, etc in un job Hadoop
* non si deve imparare Java

### MapReduce streaming con binari ed eseguibili

* Gli eseguibili sono specificati per i mapper e i reducer (combiner)!
    - ogni compito di un mapper gira come un processo indipendente  
* L'input e' convertito in linee e passato allo `STDIN` del processo
* Il mapper riceve lo  `STDOUT` del processo (Hadoop Streaming) 
    - ogni linea e' una coppia dove **chiave e valore** sono separate da un **TAB** e terminano con una **newline** \n
    - per esempio ”this is the key\tvalue is the rest\n”

### ATTENZIONE!!! Se non c'e' il TAB all'interno della linea passata al mapper, allora l'intera linea e' considerata la chiave e il valore e' nullo!!!

<big> 
Vediamo come usare:
</big> 

Un esempio di chiamata ad Hadoop Streaming:
``` bash
$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc
```

Se usassi dei codici **python** inventati da me:
``` bash
$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -files mapper.py,reducer.py
    -input input_dir/ \
    -output output_dir/ \
    -mapper mapper.py \
    -reducer reducer.py \
```

prima di lanciare un job Hadoop Streaming e' bene:

* controllare che non ci siano errori negli script
* controllare che mapper e reducer facciano esattamente il lavoro voluto

Per esempio controllando il tutto con una piccola quanita' di dati,

come con i comandi `cat` o `head`, con le **pipe**, visti precedentemente:

```
$ cat $file | python mapper.py | sort | python reducer.py
```

Vediamo un job che prende parti di un file biologico

In [9]:
%%bash
# vediamo come e' fatto il file, a noi interessa contare quante volte ogni chrM appare nel file, le linee con @ sono da saltare
 head -100 ngs.sam

@HD	VN:1.4	GO:none	SO:coordinate
@SQ	SN:chrM	LN:16571
@SQ	SN:chr1	LN:249250621
@SQ	SN:chr2	LN:243199373
@SQ	SN:chr3	LN:198022430
@SQ	SN:chr4	LN:191154276
@SQ	SN:chr5	LN:180915260
@SQ	SN:chr6	LN:171115067
@SQ	SN:chr7	LN:159138663
@SQ	SN:chr8	LN:146364022
@SQ	SN:chr9	LN:141213431
@SQ	SN:chr10	LN:135534747
@SQ	SN:chr11	LN:135006516
@SQ	SN:chr12	LN:133851895
@SQ	SN:chr13	LN:115169878
@SQ	SN:chr14	LN:107349540
@SQ	SN:chr15	LN:102531392
@SQ	SN:chr16	LN:90354753
@SQ	SN:chr17	LN:81195210
@SQ	SN:chr18	LN:78077248
@SQ	SN:chr19	LN:59128983
@SQ	SN:chr20	LN:63025520
@SQ	SN:chr21	LN:48129895
@SQ	SN:chr22	LN:51304566
@SQ	SN:chrX	LN:155270560
@SQ	SN:chrY	LN:59373566
@SQ	SN:chr1_gl000191_random	LN:106433
@SQ	SN:chr1_gl000192_random	LN:547496
@SQ	SN:chr4_ctg9_hap1	LN:590426
@SQ	SN:chr4_gl000193_random	LN:189789
@SQ	SN:chr4_gl000194_random	LN:191469
@SQ	SN:chr6_apd_hap1	LN:4622290
@SQ	SN:chr6_cox_hap2	LN:4795371
@SQ	SN:chr6_dbb_hap3	LN:4610396
@SQ	SN:chr6_mann_hap4	LN:4683263
@SQ	SN:chr6_mcf_hap5	LN:4833

In [94]:
%%writefile mapper.py

# -*- coding: utf-8 -*-
import sys

for line in sys.stdin:         # prende tutti gli stdin 
    line = line.strip()        # .strip e' un metodo che toglie gli spazi all'inizio e alla fine 
    pieces = line.split('\t')  # crea una lista dove ogni ingresso sono i pezzi separati da TAB della linea
    print(pieces)              # stampa la lista delle parole... occhio che lo fa per ogni input (ottenuto in streaming)


Overwriting mapper.py


Vediamo il mapper completo:

In [95]:
%%writefile mapper.py   

# -*- coding: utf-8 -*-
TAB = "\t"
import sys

# Cycle current streaming data
for line in sys.stdin:

    # Clean input
    line = line.strip()
    # Skip SAM/BAM headers
    if line[0] == "@":
        continue

    # Use data
    pieces = line.split(TAB)
    mychr = pieces[2]
    mystart = int(pieces[3])
    myseq = pieces[9]
    print(mychr,mystart.__str__())
    sys.exit(1)

Overwriting mapper.py


In [3]:
! head -n 100 ngs.sam | python mapper.py

chrM:14	1
chrM:15	1
chrM:16	1
chrM:17	1
chrM:18	1
chrM:19	1
chrM:20	1
chrM:21	1
chrM:22	1
chrM:23	1
chrM:24	1
chrM:25	1
chrM:26	1
chrM:27	1
chrM:28	1
chrM:29	1
chrM:30	1
chrM:31	1
chrM:32	1
chrM:33	1
chrM:34	1
chrM:35	1
chrM:36	1
chrM:37	1
chrM:38	1
chrM:39	1
chrM:40	1
chrM:41	1
chrM:42	1
chrM:43	1
chrM:44	1
chrM:45	1
chrM:46	1
chrM:47	1
chrM:48	1
chrM:49	1
chrM:50	1
chrM:51	1
chrM:52	1
chrM:53	1
chrM:54	1
chrM:55	1
chrM:56	1
chrM:57	1
chrM:58	1
chrM:59	1
chrM:60	1
chrM:61	1
chrM:62	1
chrM:63	1
chrM:64	1
chrM:65	1
chrM:66	1
chrM:67	1
chrM:68	1
chrM:69	1
chrM:70	1
chrM:71	1
chrM:72	1
chrM:73	1
chrM:74	1
chrM:75	1
chrM:76	1
chrM:77	1
chrM:78	1
chrM:79	1
chrM:80	1
chrM:81	1
chrM:82	1
chrM:83	1
chrM:84	1
chrM:85	1
chrM:86	1
chrM:87	1
chrM:88	1
chrM:14	1
chrM:15	1
chrM:16	1
chrM:17	1
chrM:18	1
chrM:19	1
chrM:20	1
chrM:21	1
chrM:22	1
chrM:23	1
chrM:24	1
chrM:25	1
chrM:26	1
chrM:27	1
chrM:28	1
chrM:29	1
chrM:30	1
chrM:31	1
chrM:32	1
chrM:33	1
chrM:34	1
chrM:35	1
chrM:36	1
chrM:37	1
chrM:38	1


Ora produciamo un mapper che emetta coppie formattate per il reducer:

In [97]:
%%writefile mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
TAB = "\t"
SEP = ':'
import sys

# Cycle current streaming data
for line in sys.stdin:
    # Clean input
    line = line.strip()
    # Skip SAM/BAM headers
    if line[0] == "@":          # evita queste linee
        continue
    
    # Use data
    pieces = line.split(TAB)    # separa i pezzi
    mychr = pieces[2]
    mystart = int(pieces[3])
    myseq = pieces[9]

    mystop = mystart + len(myseq)

    # Each element with coverage
    for i in range(mystart,mystop):
        results = [mychr+SEP+i.__str__(), "1"]
        print(TAB.join(results))


Overwriting mapper.py


In [2]:
! head -n 100 ngs.sam | python mapper.py | tail

chrM:84	1
chrM:85	1
chrM:86	1
chrM:87	1
chrM:88	1
chrM:89	1
chrM:90	1
chrM:91	1
chrM:92	1
chrM:93	1


### Shuffle 

<br><big>
Il bello di Hadoop e' che ci sono molti lati del lavoro che sono completamente trasparenti al programmatore: HDFS si occupa di tutto. Infatti si vuole proprio evitare che il programmatore diventi matto a gestire la comunicazione tra nodi.
</big>

In questo esempio:

* L'output dei mapper e' trasformato e distribuito ai reduce
* Tutte le coppie key/value sono **ordinate correttamente** prima di essere mandate ai riduttori
* Coppie con la stessa chiave vengono mandate allo stesso riduttore
* Se si incontra una chiave che e' differente da quella precedente: 
    - *siamo sicuri che la precedente non apparira' piu' (perche' le abbiamo ordinate)
* Se tutte le chiavi sono identiche
    - un solo reducer viene usato, non si guadagna in parallelismo
    - se questo succede e' importane costruire chiavi intermedie differenti

### Reducer

In [10]:
%%writefile reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
TAB = "\t"
SEP = ':'
import sys
last_value = ""              # stringa vuota
value_count = 1              # conteggio di valore = 1 
for line in sys.stdin:       # prende tutte le linee dello standard input: in ogni riga ci sono "key TAB value\n"
    value, count = line.strip().split(TAB)       # il primo elemento e' value, il secondo e' count
    # if this is the first iteration
    if not last_value:            # ricorda che vuoto = falso
        last_value = value        # definisci last value 
    # if they're the same, log it
    if value == last_value:       # se il valore attuale e' uguale a quello vecchio 
        value_count += int(count) # aumenta il numero di conteggi di count
    else:
        # state change
        try: 
            print(TAB.join([last_value, str(value_count)]))  # .join attacca last_value e value_cont tramite tab
        except:
            pass
        last_value = value
        value_count = 1
# LAST ONE after all records have been received
print(TAB.join([last_value, str(value_count)]))

Overwriting reducer.py


In [11]:
%%bash
# needs ~ 5 seconds for running      # qui il comando sort fa il lavoro di shuffling
time head -n 100 ngs.sam | python mapper.py | sort | python reducer.py | head -n 5

chrM:14	3
chrM:15	2
chrM:16	2
chrM:17	2
chrM:18	2



real	0m0.219s
user	0m0.076s
sys	0m0.356s


# Esercizio

Scrivere un mapper e un reducer in python per raggruppare le parole del "Principe" basate sulla
loro lunghezza.

Trovare quante parole ci sono piu' lunghe di 10 lettere o piu' corte di 5.

# verso il vero Hadoop

<big>
Un codice python che funge con le pipe **dovrebbe fungere** anche con Hadoop Streaming
</big>

* Per fare si' che questo succeda abbiamo bisogno di usare dei file che sono all'interno dell' Hadoop File System
* Nell' HDFS troveremo anche i log del job tracker 
* Proviamo ad usare bash scripting nel notebook 

## Preprocessing

I comandi HDFS interagiscono con l'Hadoop file system con la sintassi:
```
hdfs dfs -command
```

`command` sono come i comandi della bash

e.g.

```
hadoop dfs -mkdir hdfs:///dir
hadoop dfs -put file_on_host hdfs:///path/to/file
hadoop dfs -ls
```

<big>
Hadoop Streaming ha bisogno di “file binari” (o eseguibili)
</big>

Bisogna specificare l'interprete all'inizio dello script:
```
#!/usr/bin/env python
```

E resi eseguibili con:
```
chmod +x hs*.py
```

In [103]:
! chmod +x mapper.py reducer.py

In [104]:
%env HADOOP_STREAMING

UsageError: Environment does not have key: HADOOP_STREAMING


In [None]:
%%bash
# Launch streaming
hadoop jar $HADOOP_STREAMING

In [None]:
%%bash
# Preprocess with HDFS
hdfs dfs -rm -r -f myinput
hdfs dfs -mkdir myinput
# Save one file inside
file="/tmp/ngs.sam"
hdfs dfs -put $file myinput/file01
# Remove output or Hadoop will give error if existing
hdfs dfs -rm -r -f myoutput

Lancio finale tramite il comando bash per usare Hadoop streaming

In [None]:
%%bash 
# A real Hadoop Streaming run
time hadoop jar $HADOOP_STREAMING \
    -D mapreduce.job.mapper=12 -D mapreduce.job.reducers=4  \
    -files mapper.py,reducer.py \
    -input myinput -output myoutput \
    -mapper mapper.py -reducer reducer.py

<small>
**Nota 1**:

Se il comando necessita di minuti per andare a completamento si puo' vedere cosa succede nel Hadoop JobTracker:
http://localhost:8088/cluster

<br>


**Note 2**:

Questo comando non puo' essere eseguito dalla versione cloud dei notebook, perche' la porta non e' aperta
</small>

Hadoop streaming e' **difficile da debuggare**.
Proprio come il vero Java Hadoop.

Quando si commettono degli errori  di setup, si possono ricevere degli errori che dipendono invece dalla Java virtual machine.

Prima di googlare questo "stacktrace" e' meglio controllare che:

* i file Python  (mapper e reducer) esistano
* Che siano all'interno della main bash anche come una  **lista di file**
* Che siano eseguibili e che contengano la prima linea come hasbang 
* Che la dir di input esista sull' HDFS
* Che i file all'interno della directory di input non siano corrotti 
    - e.g. bad decompression

In [None]:
# OUTPUT: Check directory
! hdfs dfs -ls myoutput

In [None]:
# OUTPUT:  Copy file and go see it
! rm -rf hs.*.txt && hdfs dfs -get myoutput/part-00000 hs.out.txt

# Esercizio

Contare simboli (qualunque cosa che non sia una lettera dell'alfabeto) in un file di testo.

# Exercizio

Fare girare i Mapper/Reducer di python con Hadoop Streaming.

(*buona fortuna*)

## Pensieri finali su Hadoop streaming 


* Consente di scrivere job MapReduce jobs in altri linguaggi
* O persino degli eseguibili
* Veloce
* piu' semplice che in Java


### Quando e' particolarmente utile?

* Quando lo sviluppatore non conosce Java 
* per scrivere Mapper/Reducer con linguaggi di scripting 

### Svantaggi

* forza gli script in una Java VM
    - Anche se quasi non ha overhead
* I programmi/eseguibili devono prendere l'input dallo STDIN 
    - e produrre output nello STDOUT
* Ci sono restrizioni riguardo ai formati di input/output 
    - non si occupa della preparazione dei file di input/output o delle directory 
    - l'utente deve quindi usare i propri comandi hdfs  per gestire i dati

### Limiti

* Non e' un modo di lavorare Pythonico

(non e' stato scritto specificatamente per python)

## Riassunto

* Hadoop streaming fa girare Hadoop quasi nel modo classico 
* Wrappa qualsiadi eseguibile(e script)
* Puo' girare su un cluster usando  non-interactive, all-encapsulated job

# End of Chapter