Hadoop - streaming

O streaming do Hadoop é um utilitário que vem com a distribuição do Hadoop. Este utilitário permite que você crie e execute trabalhos de Mapear / Reduzir com qualquer executável ou script como mapeador e / ou redutor.

Exemplo de uso de Python

Para streaming do Hadoop, estamos considerando o problema da contagem de palavras. Qualquer trabalho no Hadoop deve ter duas fases: mapeador e redutor. Escrevemos códigos para o mapeador e o redutor em script python para executá-lo no Hadoop. Também se pode escrever o mesmo em Perl e Ruby.

Código de fase do mapeador

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

Certifique-se de que este arquivo tenha permissão de execução (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).

Código de fase do redutor

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

Salve os códigos do mapeador e do redutor em mapper.py e reducer.py no diretório inicial do Hadoop. Certifique-se de que esses arquivos tenham permissão de execução (chmod + x mapper.py e chmod + x reducer.py). Como o python é sensível à indentação, o mesmo código pode ser baixado no link abaixo.

Execução do Programa WordCount

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

Onde "\" é usado para continuação de linha para legibilidade clara.

Por exemplo,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Como funciona o streaming

No exemplo acima, tanto o mapeador quanto o redutor são scripts Python que lêem a entrada da entrada padrão e emitem a saída para a saída padrão. O utilitário criará um trabalho Mapear / Reduzir, enviará o trabalho a um cluster apropriado e monitorará o progresso do trabalho até que seja concluído.

Quando um script é especificado para mapeadores, cada tarefa do mapeador iniciará o script como um processo separado quando o mapeador for inicializado. Conforme a tarefa do mapeador é executada, ela converte suas entradas em linhas e as alimenta com a entrada padrão (STDIN) do processo. Nesse ínterim, o mapeador coleta as saídas orientadas por linha da saída padrão (STDOUT) do processo e converte cada linha em um par chave / valor, que é coletado como a saída do mapeador. Por padrão, o prefixo de uma linha até o primeiro caractere de tabulação é a chave e o resto da linha (excluindo o caractere de tabulação) será o valor. Se não houver nenhum caractere de tabulação na linha, a linha inteira será considerada a chave e o valor será nulo. No entanto, isso pode ser personalizado, conforme a necessidade.

Quando um script é especificado para redutores, cada tarefa do redutor iniciará o script como um processo separado e, em seguida, o redutor será inicializado. Conforme a tarefa do redutor é executada, ela converte seus pares de chave / valor de entrada em linhas e as alimenta com a entrada padrão (STDIN) do processo. Nesse ínterim, o redutor coleta as saídas orientadas por linha da saída padrão (STDOUT) do processo, converte cada linha em um par chave / valor, que é coletado como a saída do redutor. Por padrão, o prefixo de uma linha até o primeiro caractere de tabulação é a chave e o resto da linha (excluindo o caractere de tabulação) é o valor. No entanto, isso pode ser personalizado de acordo com requisitos específicos.

Comandos Importantes

Parâmetros Opções Descrição
- diretório de entrada / nome do arquivo Requeridos Local de entrada para mapeador.
- nome do diretório de saída Requeridos Local de saída do redutor.
-mapper executável ou script ou JavaClassName Requeridos Executável do mapeador.
-redutor executável ou script ou JavaClassName Requeridos Executável do redutor.
-file file-name Opcional Torna o mapeador, redutor ou combinador executável disponível localmente nos nós de computação.
-inputformat JavaClassName Opcional A classe fornecida deve retornar pares de chave / valor da classe Text. Se não for especificado, TextInputFormat é usado como o padrão.
-outputformat JavaClassName Opcional A classe fornecida deve ter pares de chave / valor da classe Text. Se não for especificado, TextOutputformat é usado como o padrão.
-partitioner JavaClassName Opcional Classe que determina para qual redução uma chave é enviada.
-combiner streamingCommand ou JavaClassName Opcional Executável combinador para saída de mapa.
-cmdenv name = value Opcional Passa a variável de ambiente para comandos de streaming.
-inputreader Opcional Para compatibilidade com versões anteriores: especifica uma classe de leitor de registro (em vez de uma classe de formato de entrada).
-verbose Opcional Saída detalhada.
-lazyOutput Opcional Cria a saída lentamente. Por exemplo, se o formato de saída for baseado em FileOutputFormat, o arquivo de saída será criado apenas na primeira chamada para output.collect (ou Context.write).
-numReduceTasks Opcional Especifica o número de redutores.
-mapdebug Opcional Script a ser chamado quando a tarefa do mapa falhar.
-reducedebug Opcional Script a ser chamado quando a tarefa de redução falhar.