Introduzione ad Hadoop

The article was moved to https://www.bernardi.cloud/2011/10/09/introduzione-ad-hadoop/

See you on my new website! 😎

About Paolo Bernardi

Registered software engineer with over 10 years' experience in software system design, cybersecurity and privacy expert, free and open‑source software enthusiast. Certified C2‑level English language user. I live in Acquasparta, an Umbrian town famous for being the seat of the first "Accademia dei Lincei" (Academy of the Lynx‑eyed), together with my beloved wife.
This entry was posted in italian, linux, my software and tagged , , , , , , . Bookmark the permalink.

36 Responses to Introduzione ad Hadoop

  1. Molto interessante ….. mi sto informando ultimamente su Hadoop e il tuo articolo mi è risultato essere molto utile ! ….. grazie 🙂

    Max

  2. Dba says:

    Grazie di cuore per le spiegazioni passo passo, in bocca al lupo.

  3. maria rosaria says:

    Grazie delle spiegazioni passo passo, sono risultate utilissime e funzionanti.

  4. Alessandro G. says:

    Ciao, perdonami..vorrei un attimo carpire in dettaglio una questione:
    un cluster, quindi, presenta più nodi?
    un cluster…inteso come cluster computazionale (come se fosse un elemento di una griglia per esempio un dispositivo mobile)?
    inoltre, un cluster, contiene all’interno dei dati omogenei…dove sono questi dati omogenei..o comunque..da chi sono rappresentati?

    Grazie mille.

    • Ciao. In generale un cluster (letteralmente grappolo… come le famigerate bombe) di computer è un insieme di computer, collegati tra loro tramite una rete, che lavorano a stretto contatto per un obbiettivo comune. In questo senso i computer connessi ad Internet o gli smartphone di una rete cellulare NON formano un cluster: sono connessi a una stessa rete, ma ognuno fa quel che gli pare. 🙂 Al contrario, i computer che partecipano al SETI@home, ad esempio, formano un cluster. Tuttavia generalmente i cluster sono connessi tramite rete locali ad alte prestazioni, per evitare colli di bottiglia nel trasferimento di dati tra i vari nodi (i computer del cluster).

      Poi c’è il discorso della Grid, utilizzata dal CERN e da altri istituti di ricerca: un “cluster di cluster”, al quale puoi inviare un programma da eseguire e le caratteristiche del cluster che ti serve: il sistema si occuperà di trovare un cluster rispondente alle tue esigenze e ad eseguirvi il tuo programma, nonché a restituirti i risultati del calcolo.

      Per quanto riguarda i dati, non è affatto detto che un cluster contenga dei dati omogenei. I cluster vengono utilizzati per velocizzare dei calcoli tramite la loro esecuzione in contemporanea su molteplici computer. Qualche volta è possibile ottenere questo parallelismo suddividendo la mole di dati da analizzare ed assegnandone una parte ai vari nodi: questo è quel che succede nei cluster come Hadoop e, in generale, MapReduce. Ad esempio si può usare questo approccio per generare i tasselli di una mappa (es. Google Maps): ciascun nodo del cluster può generare i tasselli di un’area geografica indipendentemente (e quindi contemporaneamente) agli altri. Problemi del genere vengono chiamati in gergo “embarassingly parallel”. Tuttavia esistono problemi che sono molto più ostici da “parallelizzare”, e che impediscono di distribuire “omogeneamente” i dati nel cluster; è comunque possibile lavorare in parallelo su dati di natura diversa, ogni volta che dei calcoli possono essere fatti indipendentemente, guadagnando così un po’ di tempo.

      • Alessandro G. says:

        Tutto chiaro per le info che mi hai dato.
        Nella fattispece, quindi, dato che la guida sopra citata è per single node suppongo che facendo riferimento a quello che hai scritto è come se il mio pc fosse “un elemento del cluster di computer” (spero di non sbagliarmi).

        In riferimento al framework Map/Reduce, adesso, come lavora sul mio pc? Da come ho capito da guida…dati i dati di ingresso si partizionano in M elementi del file system HDFS. C’è scritto sopra “tra più nodi del cluster”…ovvero: Il mio pc è un cluster di computer..quindi all’interno si screano dei nodi (corrispondenti alla dimensione del file system)? (questo per il map)

        Il Reduce, a sua volta, “raccoglierà” i record prodotti dal Map e li metterà insieme. Stiamo parlando sempre del single node (ovvero il mio pc).

        Perdonami se sono risultato confuso, è solo per capire a fondo. Spero di aver capito bene.

        Grazie

      • In effetti mi ero dimenticato di fare una precisazione: in un cluster Hadoop ci sono diversi “nodi”, ciascuno con un ruolo ben preciso (inventario dei nodi del cluster, storage dei dati, esecuzione dei job MapReduce ecc…). Hadoop ha molteplici processi software, uno per ciascun compito. Ora, questi processi possono essere distribuiti su computer diversi o, come nel caso di questo esempio, possono stare tutti in un singolo computer. La dicitura “nodo” in effetti è ambigua: più che ad un computer, pensa ad un “nodo logico”, contraddistinto dal suo compito. I “nodi logici” possono essere distribuiti secondo necessità su uno o più computer.

        In un cluster Hadoop di produzione ci saranno molti computer: uno, particolarmente controllato, che tiene l’inventario degli altri nodi del cluster, poi ci saranno molti nodi per lo storage e l’esecuzione dei job MapReduce. Tieni presente che spesso, per ragioni di efficenza, praticamente tutti i computer adibiti allo storage dei dati saranno anche adibiti anche all’esecuzione dei job MapReduce: in questo modo ciascun nodo “fisico” potrà memorizzare dati ed effettuare calcoli sugli stessi per evitare perdite di tempo dovute ai trasferimenti via rete (almeno fino ad un certo punto: alla fine sarà indispensabile effettuare un po’ di quei trasferimenti, il punto è cercare di minimizzarli).

  5. Alessandro G. says:

    Staordinario. Mi riservo di poterti ricontattare in seguito. Per ora è tutto chiaro. Se possibile anche privatamente. Grazie mille per il tuo aiuto.

  6. Alessandro G. says:

    Ciao Paolo, perdonami..avrei altre questioni su Multi-Node da porti se possibile.
    Innanzi tutto è il Master che si occupa di dividere in input in Partizioni e mandarla ai worker?
    Il Master, se vero quanto sopra scritto, in base a che “criterio” manda operazioni di Map e Operazioni di reduce ai worker?
    Ad un worker, può essere assegnata solo una operazione di map?
    Ad un worker, può essere assegnato solo una operazione di reduce?
    Ad un worker possono essere assegnate sia map che reduce?

    Grazie mille per il tuo aiuto in anticipo.

    • L’input (nell’esempio, i file) è già suddiviso in partizioni all’atto della sua memorizzazione nel filesystem distribuito di Hadoop, HDFS. I blocchi (di default, 64Mb) che compongono i vari file di input sono distribuiti all’interno del cluster e (se configurato) replicati per sicurezza.

      Nel caso di MapReduce con Hadoop chiamiamo Master il processo TaskTracker e Worker i processi JobTracker.

      Il criterio con cui il TaskTracker sceglie un JobTracker per una parte di un task MapReduce è quello della “vicinanza ai dati”: viene scelto il JobTracker che ha i dati in locale, o quantomeno il JobTracker che si trova sullo stesso rack. È un criterio molto semplice, talvolta considerato anche troppo semplicistico: lo sfruttamento della “vicinanza dei dati” è stato preferito ad altri criteri come il bilanciamento del carico di lavoro tra i vari nodi (almeno, nell’ambito di un singolo task MapReduce).

      I JobTracker eseguono operazioni sia di map che di reduce.
      C’è un numero massimo di operazioni (“slot”) che i JobTracker possono eseguire contemporaneamente. I JobTracker hanno un certo numero di “slot” liberi che rappresentano il numero massimo di operazioni che possono eseguire.

      • Alessandro G. says:

        Utilissimo come sempre.
        C’è una cosa che non mi è chiara dal tuo discorso:

        “viene scelto il JobTracker che ha i dati in locale, o quantomeno il JobTracker che si trova sullo stesso rack”

        Se stiamo partendo da un master (assumo che l’input sia sempre partizionato dal master..che poi è il filesystem di quest’ultimo a farlo da 64MB) come può un worker avere dei dati in locale (che presumo siano partizioni di input)? Glieli passa il master?
        Inoltre cosa si intende per “rack”?

        Grazie ancora.

      • Sul filesystem distribuito di Hadoop (HDFS) i dati che saranno elaborati tramite MapReduce sono sempre distribuiti tra i worker.
        I worker infatti:
        1) tengono i dati sui loro dischi;
        2) elaborano una parte delle operazioni di MapReduce su quei dati.

        Il “master” invece ha questi compiti:
        1) distribuisce i file che vengono caricati sul cluster tra i vari worker, dopo averli spezzettati in blocchi da 64MB (NON ne mantiene una copia in locale);
        2) mantiene una “mappa” di quali worker hanno quali blocchi, così da poter restituire ai client i file che richiedono;
        3) distribuisce i compiti MapReduce ai vari worker, secondo il principio di località dei dati (il master sa quali worker hanno quali dati)

        Il master è “l’interfaccia con il mondo”, tuttavia i dati ed i calcoli vengono effettuati quasi esclusivamente dai worker.

  7. Alessandro G. says:

    Ma circa i Worker che tengono i dati dati sui loro dischi…questi dati glieli passa il master sotto forma di blocchi da 64, giusto?

    • Si, giusto. E il master (in questo caso il processo NameNode) tiene anche traccia di quali worker (in questo caso DataNode) hanno i file.

      • Alessandro G. says:

        Ho notato, durante l’esecuzione del processo, che non inizierà mai il Reduce se prima non è terminato il Map.

        Quello che mi è chiaro è che, come giustamente mi hai detto, il map dice come fare il map e come fare il reduce. Il risultato di output (nel problema sopra presentato nella cartella output) è disponibile sulla macchina master.

        Se il map e reduce, insieme quindi, vengono fatti sui worker come è possibile che la macchina master abbia sulla propria cartella output il risultato del wordcount (o qualsiasi altro problema)?

        Si va a raccogliere i dati, una volta che map/reduce sono terminati, tutti i risultati dei worker?

        Ti chiedo ciò perché, da log sugli slave, ho notato che i job una volta presi in carico e processati vengono cancellati.

        Grazie.

      • Per quanto riguarda la sequenzialità di map e reduce, mi rifaccio a questa bella risposta: http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop

        Lì viene riportato che l’operazione di reduce è suddivisa in tre fasi:

        1. shuffle: letteralmente dovrebbe significare “mescolare”, invece è la fase in cui i processi “reduce” raccolgono i dati dei processi “map”; questa fase avviene contemporaneamente al map.
        2. sort: è la fase in cui i risultati del map vengono aggregati per chiave; viene fatta in parte contemporaneamente allo shuffle, in parte dopo.
        3. reduce: è l’operazione di reduce vera e propria, che viene fatta soltanto dopo la fine del map (e, ovviamente, la fine di shuffle e sort)

        Questa scomposizione del reduce in tre fasi, con la conseguente possibilità di eseguire shuffle e parte del sort contemporaneamente al map (man mano che il map manda i risultati) consente di evitare di sovraccaricare la rete, cosa che avverrebbe se tutte e tre le fasi di reduce avvenissero alla fine del map.

        Per quanto riguarda l’output: in realtà l’output è memorizzato sul filesystem distribuito HDFS al pari dell’input. Output più grandi di quelli degli esempi giocattolo saranno sempre suddivisi nei famosi blocchi di 64MB e sparsi nei vari DataNode (e, ovviamente, indicizzati dal NameNode).

        Infatti per avere l’output sul tuo computer locale devi comunque richiederlo al filesystem distribuito HDFS tramite il comando hadoop fs -get. Credo che i dati cancellati dagli slave siano dati temporanei: in ogni caso l’output del MapReduce rimane sul filesystem distribuito.

  8. Alessandro G. says:

    Ciao Paolo, perdona ancora il disturbo.
    Ho una domanda tecnica da farti.
    Fino ad ora ho provato il multi-node su macchine virtuali. Adesso lo sto portando su macchine reali presso l’università.
    Ho un problema con l’ssh.
    Sebbene importi correttamente la chiave dallo slave (uso il singolare perché sistemato questo dovrei sistemare tutto) quando parto con bin/start-dfs.sh mi da problemi di questo tipo:
    Mi chiede l’accesso alle macchine, come anche accadeva su macchine virtuali.
    Tuttavia accade una cosa del genere (la mia macchina è alessandro@master, la macchina dell’uni è universita@slave_uno):
    ….
    …..
    alessandro@slave_uno:pass…
    Metto la password e non è corretta. Credo che dipenda proprio da questo problema poiché non va sia da se metto la pass del master sia quella dello slave. Difatti mi dovrei aspettare: universita@slave_uno e poi inserire la pass.

    Ho fatto riferimento a questa guida:
    http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-multi-node-cluster/
    (praticament euguale alla tua ma con piccoli accorgimenti.

    Ti ringrazio in anticipo.

  9. Alessandro G. says:

    Ciao Paolo, perdonami ancora.
    Il problema di cui sopra l’ho risolto semplicemente accedendo con lo stesso account utente. Nella fattispecie alessandro@master,alessandro@slave_uno,alessandro@slave_due ecc.
    Volevo farti alcune domande tecniche se me lo permetti.
    Nel file Slave di hadoop io vado a indicare chi sono gli host slave.

    Che significherebbe indicare in questo file oltre che gli slave, anche lo stesso master? Vuol dire che anche il master “lavora” come slave?

    Ho configurato la mia griglia all’università. Ho però la necessita (dato che gli ip cambiano ogni volta che accendo i pc) di riformattare tutti il file system di hadoop. E’ normale che accada ciò? O c’è un aspetto che ho trascurato?

    Molto spesso mi ricapita, per fare dei test, di eseguire lo stesso *.jar. Succede però che da errore hadoop in quanto è presente il job precendente. Ma non dovrebbe incrementarsi automaticamente?
    Inoltre, per la cartella, di input-output che vado a indicare è una sola oppure devo fare tante cartelle quanto sono i test che devo fare?

    Ti ringrazio infinitamente.

    Alessandro

    • Sarebbe bene che gli IP fossero sempre gli stessi.

      Se sul “master” fai girare anche i processi necessari agli “slave” questi può tranquillamente fungere da slave, si.

      Per quanto riguarda le cartelle di I/O, se l’input dei test è lo stesso non vedo perché non usare la stessa cartella. Per l’output invece è diverso: se la cartella di output esiste già, Hadoop si ferma per evitare di sovrascriverla. Forse questo è anche l’errore che hai quando rifai lo stesso test più volte?

  10. Alessandro G. says:

    Si esatto, proprio così. Volevo capire bene come estrapolare agevolmente i dati. Ad ogni modo, per come funge Hadoop, e per le nostre discussioni precenti è sintatticamente e semanticamente illogico far diventare “operativo” il master, giusto?

    Per quanto concerne la cartella di output, cosa mi consigli di fare? ricrearne una nuova ogni volta? o c’è qualche accorgimento da fare che mi è sfuggito?

    Alessandro

    • Si, il master è un punto critico dell’infrastruttura e non è mai sovraccacato di compiti da svolgere.

      Per quanto riguarda la cartella di output non conosco accorgimenti particolari: o la cancelli o cambi nome ogni volta.

  11. Alessandro G. says:

    Volevo capire anche un’altra cosa:

    Giustamente, nel master configuro il file /etc/hosts affiché possa conoscere chi sono gli slave.
    Questa operazione viene fatta anche sugli slave, come imposto.
    Ma perché gli slave devono conoscersi tra di loro? Perché io imposto:
    IP1 master
    IP2 slave_uno
    IP3 slave_due

    e lo faccio un tutti i pc. Mi domandavo se a questo punto, dato che mi è imposto, gli slave implicitamente si scambiano dati e di che natura sono gli scambi.

    Grazie.

  12. Alessandro G. says:

    Ciao Paolo perdona il disturbo.
    Come forse avrai capito sto facendo una tesi sperimentale su questo argomento. Devo importare e quindi distribuire una applicazione stand-alone.
    I dati in ingresso sono delle quintuple presenti in un db.
    Per ottenere queste quintuple c’è una fase di pre-processing all’algoritmo stesso.
    E’ possibile che questa fase di pre-processing possa essere distribuita così come lo è l’applicazione?
    Da come avevo capito i dati partono dal master e poi sono distribuiti sugli slave e quindi non è possibile fare pre-processing distribuito ma solo stand-alone (su master).
    Mentre il docente vorrebbe far si che le tuple e, quindi, i dati di input vengano distribuiti.

    Grazie mille.

    • Si, avevo vagamente intuito qualcosa del genere. 😉

      Dunque, la parte finale del discorso consiste nell’elaborare le quintuple sul cluster con MapReduce.

      A loro volta le quintuple sono ottenute da un pre-processing, ma quale forma hanno i dati di partenza, precisamente? Sono mappabili 1 a 1 con le quintuple (un dato di partenza -> una quintupla)?

      In linea di massima il pre-processing potrebbe essere fatto nella fase di Map, ma dipende dal tipo di operazione. Mandami qualche dettaglio in più, anche via email a bernarpa@gmail.com.

  13. Alessandro G. says:

    Paolo ti ho scritto per mail.
    Spero ti sia arrivato il tutto.

    Grazie.

  14. giorgio says:

    Ciao Paolo,
    ho alcune informazioni da chiederti.
    hai qualche consiglio per configurare degli ambienti di produzione/collaudo/sviluppo?
    supponiamo che in produzione/esercizio abbia la necessitá di avere i seguenti server:
    – 6 sistemi Fisici (Blade Server) con
    – 16 CPU-Core (Intel Xeon E5 2690)
    – 256 GB RAM

    come posso procedere con il dimensionamento degli ambienti di collaudo/sviluppo?
    qual´é la configurazione minima di server per eseguire Hadoop?

    avevo pensato:
    collaudo – 3 server per esecuzione Multi-Node
    sviluppo – 1 server con installazone Hadoop Single-Node

    che ne pensi?
    il porting delle procedure da single node verso multi node é trasparente?
    cosa cambia, performance a parte, dal single node al multi node ?
    ciao e grazie

    • Heylà!

      Per quanto riguarda il porting delle procedure, si, è trasparente.

      Per quanto riguarda il dimensionamento, sinceramente non ho esperienza al riguardo.
      In generale, per la scelta di un ambiente di produzione, ti direi di seguire i consigli riportati qui:

      http://hortonworks.com/blog/best-practices-for-selecting-apache-hadoop-hardware/

      A prescindere dagli ambienti di produzione/collaudo/sviluppo, non c’è bisogno di enormi potenze computazionali. L’importante è avere una buona RAM (sia quantità che qualità), dischi capienti e veloci (neanche troppo), connessione di rete decente (almeno Gigabit, niente di che).

      Per quanto riguarda l’ambiente di collaudo, la configurazione minima è un solo computer: dovrebbe essere una scelta sensata per lo sviluppo.

      Per il collaudo, tanto per prendere in considerazione la connessione di rete, forse può essere utile usare una configurazione con 1 nodo master (NameNode e JobTracker) e due o tre slave (ciascuno con TaskTracker + DataNode).

      • Giorgio says:

        Grazie per la risposta.
        Volevo chiederti se hai documentazione per porting/realizzazione di data warehouse su Hadoop. O in generale delle best pratices che possano aiutare nella progettazione si sistemi su tecnologia Hadoop
        Ti ringrazio.

      • Non ho niente del genere. Comunque la progettazione di un sistema basato su Hadoop è piuttosto standard, a meno che non arrivi ai livelli di Facebook & co. (svariati petabyte).

        Questo mi sembra un valido esempio di buone pratiche nell’uso di Hadoop:

        http://developer.yahoo.com/blogs/hadoop/posts/2010/08/apache_hadoop_best_practices_a/

        Principalmente: non essere eccessivi con la granularità di input, output e quel che c’è nel mezzo, nonché usare strumenti di alto livello come Pig o Hive quando possibile.

  15. Alessandro G. says:

    Ciao Paolo,
    una domanda ancora.

    E’ corretto dal tuo punto di vista che gli slave possano comunicare tra di loro?

    Grazie anticipatamente.

    • Hey!

      Facciamo nuovamente la precisazione sugli “slave”: ogni slave ha generalmente due compiti, la memorizzazione dei dati nelle vesti di DataNode e l’esecuzione di map e reduce nelle vesti di TaskTracker.

      I DataNode comunicano tra di loro per scambiarsi blocchi di dati nell’ambito delle operazioni di replica. Tuttavia, credo che tu ti riferisca alla computazione, ovvero agli slave nelle vesti di TaskTracker. In questo caso, la mancanza di comunicazione tra i vari nodi è un punto di forza dei claster di tipo MapReduce.

      Riporto un paragrafo da http://developer.yahoo.com/hadoop/tutorial/module1.html:

      <>

      Ovviamente la mancanza di comunicazione tra i TaskTracker è restrittiva: esistono operazioni la cui parallelizzazione la richiede necessariamente. In questo caso è certamente possibile far comunicare i TaskTracker tra di loro, con la consapevolezza però che si sta usando Hadoop in modo non “naturale” (per intenderci, lo si sta usando quasi come un’applicazione parallela ad-hoc basata su MPI, ad esempio).

  16. claudio says:

    Ciao Alessandro, ho il tuo stesso problema. magari se ci sentiamo per mail possiamo cercare di risolverlo insime. la mia mail è cla-85@hotmail.it

  17. claudio says:

    Ciao Paolo, devo chiederti una parere. Ho installato hadoop su due nodi, su due pc differenti.
    nessun problema nel configurarli, ho seguito le guide che stanno on line. Riesco ad avviare e terminare correttamente un job. Adesso i problemi sono due.(hadoop gira su due macchine, entrambi aventi xubuntu)
    – non riesco ad eseguire il comando jps, perverificarre i servizi attivi.
    – e non trovo un miglioramente, in termini di velocità nell’esecuzione del job. Non vorrei aver sbagliato qualcosa ed eseguo il job sempre su una macchina sola.
    io avvio, sempre tutto, dalla macchina master.(hdfs e mapreduce sulla macchina master e datanode e tasktraker sullo slave).
    Grazie.

  18. Ciao Paolo, volevo chiederti una cosa. Ho seguito la guida per installare hadoop. Quando inserisco la riga

    $ hadoop namenode -format

    mi da come risultato

    hadoop: comando non trovato

    Dove sbaglio?
    Grazie Angelo

Leave a reply to Angelo Calabrese Cancel reply