MapReduce con apache hadoop

Introducción

En esta entrada (y como continuación a Apache Hadoop) se empleará Apache Hadoop para realizar procesamiento de grandes cantidades de datos en sistemas de archivos distribuidos usando Spring y MapReduce.

Descripción

MapReduce es un framework (modelo de programación) utilizado para dar soporte a la computación paralela sobre grandes colecciones de datos en grupos de computadoras; es por esta razón por la que este framework suele ejecutarse en sistema de archivos distribuidos (HDFS).

Función Map()

Map toma uno de estos pares de datos con un tipo en un dominio de datos, y devuelve una lista de pares en un dominio diferente:

 

La función map(): se encarga del mapeo y es aplicada en paralelo para cada ítem en la entrada de datos. Esto produce una lista de pares (k2,v2) por cada llamada.

Función Reduce()

La función reduce es aplicada en paralelo para cada grupo, produciendo una colección de valores para cada dominio:

 

La función reduce(): cada llamada a Reduce típicamente produce un valor v3 o una llamada vacía, aunque una llamada puede retornar más de un valor.

Y a grandes rasgos, MapReduce transforma una lista de pares (clave, valor) en una lista de valores, los cuales serán procesados en paralelo dentro del sistema distribuidor; esta ejecución distribuida es administrada por el framework (en este caso hadoop), determinando que nodo puede ejecutar que proceso y en que momento, y este administrador se mantiene a la espera hasta que todos los nodos terminen sus ejecuciones para poder "unir" los resultados.

Vamos al ejemplo

Para mostrar la manera en que se puede delegar el proceso de grandes cantidades de datos a Hadoop, escribiremos código para procesar un archivo de textos (un libro en formato texto - o varios incluso -) , contara las palabras y nos entregará un resumen en el que se nos indicarán las palabras que contiene el archivo de texto ( o los archivos ), asi como la cantidad de veces que aparecen en este archivo (o los archivos); esto es equivalente a SELECT palabra, count(*) FROM tabla GROUP BY palabra en SQL.

Para esta tarea debemos escribir un Mapper y un Reducer:

- Mapper (SpringMapper.groovy)

 

Este mapper agrupara cada palabra en un "mapa" similar a Map y ya, el resto del mapping lo hará el framework.

- Reducer

 

Finalmente, el reducer se encarga de hacer el group by, obteniendo del iterador (que contiene ya todas las palabras generadas por el mapper) el count por cada palabra e incrementando en uno este contador.

Más detalles ...

Ahora, como funciona esto a mas detalle ? :

La función map() se ejecuta de forma distribuida a lo largo de varias máquinas. Los datos de entrada, procedentes por regla general de un gran archivo, se dividen en un conjunto de M particiones de entrada de generalmente 16 megabytes. Estas particiones pueden ser procesadas en diversas máquinas. En una invocación de MapReduce suelen ocurrir varias operaciones:

  • Se procede a dividir las entradas en en M particiones de tamaño aproximado de 64 megabytes. El programa MapReduce se comienza a instanciar en las diversas máquinas del cluster. Por regla general, el número de instancias se configura en las aplicaciones.
  • Una de las copias del programa es especial y toma el papel de "maestro". El resto de copias se denominan "workers" y reciben la asignación de sus tareas desde el master. Se considera que existen una cantidad de M map() tareas y de R reduce(). El "maestro" se encarga de recopilar "workers" en reposo (es decir sin tarea asignada) y le asignará una tarea específica de map() o de reduce(). Un worker sólo puede tener tres estados: reposo, trabajando, completo.
  • Un worker que tenga asignada una tarea específica de map() tomará como entrada la partición que le corresponda. Se dedicará a parsear los pares (clave, valor) para crear una nueva pareja de salida, tal y como se especifica en su programación. Los pares clave y valor producidos por la función map() se almacenan como buffer en la memoria.
  • Periodicamente, los pares clave-valor almacenados en el buffer se escriben en el disco local, repartidos en R regiones. Las regiones de estos pares clave-valor son pasados al master, que es responsable de redirigir a los "workers" que tienen tareas de reduce().
  • Cuando un worker de tipo reduce es notificado por el "maestro" con la localización de una partición, éste emplea llamadas remotas para hacer lecturas de la información almacenada en los discos duros de los diversos workers de tipo map(). cuando un worker de tipo reduce() lee todos los datos intermedios, ordena las claves de tal suerte que a se agrupan los datos encontrados que poseen la misma clave. El ordenamiento es necesario debido a que, por regla general, muchas claves de funciones map() diversas pueden ir a una misma función reduce(). En aquellos casos en los que la cantidad de datos intermedios sean muy grandes, se suele emplear un ordenamiento externo.
  • El worker de tipo reduce() itera sobre el conjunto de valores ordenados intermedios, y lo hace por cada una de las claves únicas encontradas. Toma la clave y el conjunto de valores asociados a ella y se los pasa a la función reduce(). La salida de reduce() se añade al archivo de salida de MapReduce.
  • Cuando todas las tareas map() y reduce() se han completado, el "maestro" levanta al programa del usuario. Llegados a este punto la llamada MapReduce retorna el control al código de un usuario.

Se considera que ha habido un final de las tareas cuando este control se ha devuelto al usuario. Las salidas se ditribuyen en un archivo completo, o en su defecto se reparten en R archivos. Estos R archivos pueden ser la entrada de otro MapReduce o puede ser procesado por cualquier otro programa que necesite estos datos.

Finalmente, y para hacer esto mas sencillo, le pediremos a Spring que nos ayude a correr esto en el sistema de archivos distribuido; para ello crearemos el archivo XML que contendrá la definición de beans necesarios :

 

Finalmente, debemos crear o subir los archivos a procesar en nuestro HDFS; para este ejemplo, podremos usar el "Quijote de la Mancha" () y "El manco de lepanto" ():

 

Finalmente, desde el IDE podemos correr nuestra clase principal (Main.groovy) :

 

Veremos algo asi :

 

En esta salida vemos que no tuvimos que indicarle que archivos procesar, mas bien, lee los dos archivos que encontro en el directorio 'input' que creamos; tambien vemos el proceso al hacer Map y el proceso al hacer Reduce.

Para ver el resultado debemos hacer :

 

En este ejemplo no se ve mucho diferencia de hacer un script en perl, sh o groovy, pero si se necesita procesar muchos registros (sin importar si vienen de un archivo de texto, o de una base de datos), se puede procesar en un cluster real hadoop y el procesamiento será notoriamente mas rapido.