Hadoop - streaming
O streaming do Hadoop é um utilitário que vem com a distribuição do Hadoop. Este utilitário permite que você crie e execute trabalhos de Mapear / Reduzir com qualquer executável ou script como mapeador e / ou redutor.
Exemplo de uso de Python
Para streaming do Hadoop, estamos considerando o problema da contagem de palavras. Qualquer trabalho no Hadoop deve ter duas fases: mapeador e redutor. Escrevemos códigos para o mapeador e o redutor em script python para executá-lo no Hadoop. Também se pode escrever o mesmo em Perl e Ruby.
Código de fase do mapeador
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
Certifique-se de que este arquivo tenha permissão de execução (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).
Código de fase do redutor
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Salve os códigos do mapeador e do redutor em mapper.py e reducer.py no diretório inicial do Hadoop. Certifique-se de que esses arquivos tenham permissão de execução (chmod + x mapper.py e chmod + x reducer.py). Como o python é sensível à indentação, o mesmo código pode ser baixado no link abaixo.
Execução do Programa WordCount
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
Onde "\" é usado para continuação de linha para legibilidade clara.
Por exemplo,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
Como funciona o streaming
No exemplo acima, tanto o mapeador quanto o redutor são scripts Python que lêem a entrada da entrada padrão e emitem a saída para a saída padrão. O utilitário criará um trabalho Mapear / Reduzir, enviará o trabalho a um cluster apropriado e monitorará o progresso do trabalho até que seja concluído.
Quando um script é especificado para mapeadores, cada tarefa do mapeador iniciará o script como um processo separado quando o mapeador for inicializado. Conforme a tarefa do mapeador é executada, ela converte suas entradas em linhas e as alimenta com a entrada padrão (STDIN) do processo. Nesse ínterim, o mapeador coleta as saídas orientadas por linha da saída padrão (STDOUT) do processo e converte cada linha em um par chave / valor, que é coletado como a saída do mapeador. Por padrão, o prefixo de uma linha até o primeiro caractere de tabulação é a chave e o resto da linha (excluindo o caractere de tabulação) será o valor. Se não houver nenhum caractere de tabulação na linha, a linha inteira será considerada a chave e o valor será nulo. No entanto, isso pode ser personalizado, conforme a necessidade.
Quando um script é especificado para redutores, cada tarefa do redutor iniciará o script como um processo separado e, em seguida, o redutor será inicializado. Conforme a tarefa do redutor é executada, ela converte seus pares de chave / valor de entrada em linhas e as alimenta com a entrada padrão (STDIN) do processo. Nesse ínterim, o redutor coleta as saídas orientadas por linha da saída padrão (STDOUT) do processo, converte cada linha em um par chave / valor, que é coletado como a saída do redutor. Por padrão, o prefixo de uma linha até o primeiro caractere de tabulação é a chave e o resto da linha (excluindo o caractere de tabulação) é o valor. No entanto, isso pode ser personalizado de acordo com requisitos específicos.
Comandos Importantes
Parâmetros | Opções | Descrição |
---|---|---|
- diretório de entrada / nome do arquivo | Requeridos | Local de entrada para mapeador. |
- nome do diretório de saída | Requeridos | Local de saída do redutor. |
-mapper executável ou script ou JavaClassName | Requeridos | Executável do mapeador. |
-redutor executável ou script ou JavaClassName | Requeridos | Executável do redutor. |
-file file-name | Opcional | Torna o mapeador, redutor ou combinador executável disponível localmente nos nós de computação. |
-inputformat JavaClassName | Opcional | A classe fornecida deve retornar pares de chave / valor da classe Text. Se não for especificado, TextInputFormat é usado como o padrão. |
-outputformat JavaClassName | Opcional | A classe fornecida deve ter pares de chave / valor da classe Text. Se não for especificado, TextOutputformat é usado como o padrão. |
-partitioner JavaClassName | Opcional | Classe que determina para qual redução uma chave é enviada. |
-combiner streamingCommand ou JavaClassName | Opcional | Executável combinador para saída de mapa. |
-cmdenv name = value | Opcional | Passa a variável de ambiente para comandos de streaming. |
-inputreader | Opcional | Para compatibilidade com versões anteriores: especifica uma classe de leitor de registro (em vez de uma classe de formato de entrada). |
-verbose | Opcional | Saída detalhada. |
-lazyOutput | Opcional | Cria a saída lentamente. Por exemplo, se o formato de saída for baseado em FileOutputFormat, o arquivo de saída será criado apenas na primeira chamada para output.collect (ou Context.write). |
-numReduceTasks | Opcional | Especifica o número de redutores. |
-mapdebug | Opcional | Script a ser chamado quando a tarefa do mapa falhar. |
-reducedebug | Opcional | Script a ser chamado quando a tarefa de redução falhar. |