Hadoop - MapReduce

MapReduce é uma estrutura com a qual podemos escrever aplicativos para processar grandes quantidades de dados, em paralelo, em grandes clusters de hardware comum de maneira confiável.

O que é MapReduce?

MapReduce é uma técnica de processamento e um modelo de programa para computação distribuída baseado em java. O algoritmo MapReduce contém duas tarefas importantes, ou seja, Mapear e Reduzir. O mapa pega um conjunto de dados e o converte em outro conjunto de dados, onde os elementos individuais são divididos em tuplas (pares de chave / valor). Em segundo lugar, reduza a tarefa, que pega a saída de um mapa como uma entrada e combina essas tuplas de dados em um conjunto menor de tuplas. Como a sequência do nome MapReduce indica, a tarefa de redução é sempre executada após o trabalho de mapa.

A principal vantagem do MapReduce é que é fácil dimensionar o processamento de dados em vários nós de computação. No modelo MapReduce, os primitivos de processamento de dados são chamados de mapeadores e redutores. Decompor um aplicativo de processamento de dados em mapeadores e redutores às vezes não é trivial. Mas, uma vez que escrevemos um aplicativo no formato MapReduce, escalar o aplicativo para rodar em centenas, milhares ou mesmo dezenas de milhares de máquinas em um cluster é apenas uma mudança de configuração. Essa escalabilidade simples é o que atraiu muitos programadores a usar o modelo MapReduce.

O Algoritmo

  • Geralmente o paradigma MapReduce é baseado no envio do computador para onde os dados residem!

  • O programa MapReduce é executado em três estágios, ou seja, estágio de mapa, estágio de embaralhamento e estágio de redução.

    • Map stage- O trabalho do mapa ou mapeador é processar os dados de entrada. Geralmente, os dados de entrada estão na forma de arquivo ou diretório e são armazenados no sistema de arquivos Hadoop (HDFS). O arquivo de entrada é passado para a função do mapeador linha por linha. O mapeador processa os dados e cria vários pequenos blocos de dados.

    • Reduce stage - Este estágio é a combinação do Shuffle palco e o Reducepalco. O trabalho do Redutor é processar os dados que vêm do mapeador. Após o processamento, ele produz um novo conjunto de saída, que será armazenado no HDFS.

  • Durante um trabalho MapReduce, o Hadoop envia as tarefas Mapear e Reduzir para os servidores apropriados no cluster.

  • A estrutura gerencia todos os detalhes da passagem de dados, como emissão de tarefas, verificação da conclusão da tarefa e cópia de dados em todo o cluster entre os nós.

  • A maior parte da computação ocorre em nós com dados em discos locais que reduzem o tráfego de rede.

  • Após a conclusão das tarefas fornecidas, o cluster coleta e reduz os dados para formar um resultado apropriado e os envia de volta ao servidor Hadoop.

Entradas e saídas (perspectiva Java)

A estrutura MapReduce opera em pares <chave, valor>, ou seja, a estrutura visualiza a entrada para a tarefa como um conjunto de pares <chave, valor> e produz um conjunto de pares <chave, valor> como a saída da tarefa , possivelmente de tipos diferentes.

A chave e as classes de valor devem ser serializadas pela estrutura e, portanto, precisam implementar a interface gravável. Além disso, as classes principais precisam implementar a interface gravável-comparável para facilitar a classificação pela estrutura. Tipos de entrada e saída de umMapReduce job - (Entrada) <k1, v1> → mapa → <k2, v2> → reduzir → <k3, v3> (Saída).

Entrada Resultado
Mapa <k1, v1> lista (<k2, v2>)
Reduzir <k2, lista (v2)> lista (<k3, v3>)

Terminologia

  • PayLoad - Os aplicativos implementam as funções Map e Reduce e formam o núcleo do trabalho.

  • Mapper - Mapeador mapeia os pares de chave / valor de entrada para um conjunto de par de chave / valor intermediário.

  • NamedNode - Nó que gerencia o Hadoop Distributed File System (HDFS).

  • DataNode - Nó onde os dados são apresentados com antecedência antes que qualquer processamento ocorra.

  • MasterNode - Nó onde o JobTracker é executado e que aceita solicitações de trabalho de clientes.

  • SlaveNode - Nó onde o programa Map e Reduce é executado.

  • JobTracker - Agenda tarefas e rastreia as tarefas atribuídas ao rastreador de tarefas.

  • Task Tracker - Rastreia a tarefa e informa o status ao JobTracker.

  • Job - Um programa é a execução de um Mapeador e Redutor em um conjunto de dados.

  • Task - Uma execução de um Mapeador ou Redutor em uma fatia de dados.

  • Task Attempt - Uma instância particular de uma tentativa de executar uma tarefa em um SlaveNode.

Cenário de Exemplo

A seguir estão os dados relativos ao consumo elétrico de uma organização. Contém o consumo elétrico mensal e a média anual de vários anos.

Jan Fev Mar Abr Maio Junho Jul Agosto Set Out Nov Dez Média
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Se os dados acima forem fornecidos como entrada, temos que escrever aplicativos para processá-los e produzir resultados como encontrar o ano de uso máximo, ano de uso mínimo e assim por diante. Esta é uma vitória fácil para os programadores com um número finito de registros. Eles simplesmente escreverão a lógica para produzir a saída necessária e passarão os dados para o aplicativo escrito.

Mas, pense nos dados que representam o consumo elétrico de todas as indústrias de grande porte de um determinado estado, desde sua formação.

Quando escrevemos aplicativos para processar esses dados em massa,

  • Eles levarão muito tempo para serem executados.

  • Haverá um tráfego de rede pesado quando movermos os dados da origem para o servidor de rede e assim por diante.

Para resolver esses problemas, temos a estrutura MapReduce.

Dados de entrada

Os dados acima são salvos como sample.txte fornecido como entrada. O arquivo de entrada se parece com o mostrado abaixo.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Programa Exemplo

A seguir está o programa para os dados de amostra usando a estrutura MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Salve o programa acima como ProcessUnits.java. A compilação e execução do programa são explicadas a seguir.

Programa de Compilação e Execução de Unidades de Processo

Vamos supor que estamos no diretório inicial de um usuário Hadoop (por exemplo, / home / hadoop).

Siga as etapas fornecidas a seguir para compilar e executar o programa acima.

Passo 1

O comando a seguir é para criar um diretório para armazenar as classes java compiladas.

$ mkdir units

Passo 2

Baixar Hadoop-core-1.2.1.jar,que é usado para compilar e executar o programa MapReduce. Visite o seguinte link mvnrepository.com para baixar o jar. Vamos supor que a pasta baixada seja/home/hadoop/.

etapa 3

Os seguintes comandos são usados ​​para compilar o ProcessUnits.java programa e criando um jar para o programa.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Passo 4

O comando a seguir é usado para criar um diretório de entrada no HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Etapa 5

O seguinte comando é usado para copiar o arquivo de entrada chamado sample.txtno diretório de entrada do HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Etapa 6

O comando a seguir é usado para verificar os arquivos no diretório de entrada.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Etapa 7

O comando a seguir é usado para executar o aplicativo Eleunit_max, obtendo os arquivos de entrada do diretório de entrada.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Espere um pouco até que o arquivo seja executado. Após a execução, conforme mostrado abaixo, a saída conterá o número de divisões de entrada, o número de tarefas de mapa, o número de tarefas de redutor, etc.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

Etapa 8

O comando a seguir é usado para verificar os arquivos resultantes na pasta de saída.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Etapa 9

O seguinte comando é usado para ver a saída em Part-00000 Arquivo. Este arquivo é gerado pelo HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Abaixo está a saída gerada pelo programa MapReduce.

1981    34 
1984    40 
1985    45

Etapa 10

O comando a seguir é usado para copiar a pasta de saída do HDFS para o sistema de arquivos local para análise.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Comandos Importantes

Todos os comandos do Hadoop são invocados pelo $HADOOP_HOME/bin/hadoopcomando. A execução do script Hadoop sem nenhum argumento imprime a descrição de todos os comandos.

Usage - hadoop [--config confdir] COMANDO

A tabela a seguir lista as opções disponíveis e suas descrições.

Sr. Não. Opção e descrição
1

namenode -format

Formata o sistema de arquivos DFS.

2

secondarynamenode

Executa o namenode secundário DFS.

3

namenode

Executa o namenode DFS.

4

datanode

Executa um datanode DFS.

5

dfsadmin

Executa um cliente de administração DFS.

6

mradmin

Executa um cliente de administração Map-Reduce.

7

fsck

Executa um utilitário de verificação do sistema de arquivos DFS.

8

fs

Executa um cliente de usuário de sistema de arquivos genérico.

9

balancer

Executa um utilitário de balanceamento de cluster.

10

oiv

Aplica o visualizador fsimage offline a uma fsimage.

11

fetchdt

Busca um token de delegação do NameNode.

12

jobtracker

Executa o nó do rastreador de trabalhos MapReduce.

13

pipes

Executa um trabalho de Pipes.

14

tasktracker

Executa um nó do rastreador de tarefas MapReduce.

15

historyserver

Executa servidores de histórico de tarefas como um daemon independente.

16

job

Manipula os trabalhos MapReduce.

17

queue

Obtém informações sobre JobQueues.

18

version

Imprime a versão.

19

jar <jar>

Executa um arquivo jar.

20

distcp <srcurl> <desturl>

Copia arquivos ou diretórios recursivamente.

21

distcp2 <srcurl> <desturl>

DistCp versão 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Cria um arquivo hadoop.

23

classpath

Imprime o caminho da classe necessário para obter o jar do Hadoop e as bibliotecas necessárias.

24

daemonlog

Obter / definir o nível de log para cada daemon

Como interagir com trabalhos MapReduce

Uso - trabalho hadoop [GENERIC_OPTIONS]

A seguir estão as opções genéricas disponíveis em uma tarefa do Hadoop.

Sr. Não. GENERIC_OPTION & Description
1

-submit <job-file>

Envia o trabalho.

2

-status <job-id>

Imprime o mapa e reduz a porcentagem de conclusão e todos os contadores de trabalho.

3

-counter <job-id> <group-name> <countername>

Imprime o valor do contador.

4

-kill <job-id>

Mata o trabalho.

5

-events <job-id> <fromevent-#> <#-of-events>

Imprime os detalhes dos eventos recebidos pelo jobtracker para o intervalo fornecido.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Imprime detalhes do trabalho, detalhes de dicas falhadas e eliminadas. Mais detalhes sobre o trabalho, como tarefas bem-sucedidas e tentativas de tarefa feitas para cada tarefa, podem ser exibidos especificando a opção [todos].

7

-list[all]

Exibe todos os trabalhos. -list exibe apenas os trabalhos que ainda não foram concluídos.

8

-kill-task <task-id>

Mata a tarefa. As tarefas eliminadas NÃO são contabilizadas nas tentativas malsucedidas.

9

-fail-task <task-id>

Falha na tarefa. As tarefas com falha são contadas em relação às tentativas com falha.

10

-set-priority <job-id> <priority>

Altera a prioridade do trabalho. Os valores de prioridade permitidos são VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Para ver o status do trabalho

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Para ver o histórico do job output-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Para matar o trabalho

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004