MapReduce - Particionador
Um particionador funciona como uma condição no processamento de um conjunto de dados de entrada. A fase de partição ocorre após a fase de Mapa e antes da fase de Redução.
O número de particionadores é igual ao número de redutores. Isso significa que um particionador dividirá os dados de acordo com o número de redutores. Portanto, os dados passados de um único particionador são processados por um único Redutor.
Particionador
Um particionador particiona os pares chave-valor de saídas Map intermediárias. Ele particiona os dados usando uma condição definida pelo usuário, que funciona como uma função hash. O número total de partições é igual ao número de tarefas do Redutor para o trabalho. Vamos dar um exemplo para entender como funciona o particionador.
Implementação do Particionador MapReduce
Por uma questão de conveniência, vamos supor que temos uma pequena tabela chamada Funcionário com os seguintes dados. Usaremos esses dados de amostra como nosso conjunto de dados de entrada para demonstrar como o particionador funciona.
Eu iria | Nome | Era | Gênero | Salário |
---|---|---|---|---|
1201 | gopal | 45 | Masculino | 50.000 |
1202 | manisha | 40 | Fêmea | 50.000 |
1203 | Khalil | 34 | Masculino | 30.000 |
1204 | prasanth | 30 | Masculino | 30.000 |
1205 | Kiran | 20 | Masculino | 40.000 |
1206 | laxmi | 25 | Fêmea | 35.000 |
1207 | bhavya | 20 | Fêmea | 15.000 |
1208 | reshma | 19 | Fêmea | 15.000 |
1209 | Kranthi | 22 | Masculino | 22.000 |
1210 | Satish | 24 | Masculino | 25.000 |
1211 | Krishna | 25 | Masculino | 25.000 |
1212 | Arshad | 28 | Masculino | 20.000 |
1213 | Lavanya | 18 | Fêmea | 8.000 |
Temos que escrever um aplicativo para processar o conjunto de dados de entrada para encontrar o funcionário mais bem remunerado por gênero em diferentes grupos de idade (por exemplo, abaixo de 20, entre 21 a 30, acima de 30).
Dados de entrada
Os dados acima são salvos como input.txt no diretório “/ home / hadoop / hadoopPartitioner” e fornecido como entrada.
1201 | gopal | 45 | Masculino | 50000 |
1202 | manisha | 40 | Fêmea | 51000 |
1203 | khaleel | 34 | Masculino | 30000 |
1204 | prasanth | 30 | Masculino | 31.000 |
1205 | Kiran | 20 | Masculino | 40.000 |
1206 | laxmi | 25 | Fêmea | 35000 |
1207 | bhavya | 20 | Fêmea | 15.000 |
1208 | reshma | 19 | Fêmea | 14000 |
1209 | Kranthi | 22 | Masculino | 22000 |
1210 | Satish | 24 | Masculino | 25000 |
1211 | Krishna | 25 | Masculino | 26000 |
1212 | Arshad | 28 | Masculino | 20.000 |
1213 | Lavanya | 18 | Fêmea | 8000 |
Com base na entrada fornecida, a seguir está a explicação algorítmica do programa.
Tarefas de mapa
A tarefa de mapa aceita os pares de valores-chave como entrada enquanto temos os dados de texto em um arquivo de texto. A entrada para esta tarefa de mapa é a seguinte -
Input - A chave seria um padrão como “qualquer tecla especial + nome do arquivo + número da linha” (exemplo: chave = @ input1) e o valor seria os dados dessa linha (exemplo: valor = 1201 \ t gopal \ t 45 \ t Masculino \ t 50000).
Method - A operação desta tarefa de mapa é a seguinte -
Leia o value (dados de registro), que vem como valor de entrada da lista de argumentos em uma string.
Usando a função de divisão, separe o gênero e armazene em uma variável de string.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Envie as informações de gênero e os dados do registro value como par de valores-chave de saída da tarefa de mapa para o partition task.
context.write(new Text(gender), new Text(value));
Repita todas as etapas acima para todos os registros no arquivo de texto.
Output - Você obterá os dados de gênero e o valor dos dados de registro como pares de valores-chave.
Tarefa do Particionador
A tarefa do particionador aceita os pares de valores-chave da tarefa de mapa como sua entrada. A partição implica em dividir os dados em segmentos. De acordo com os critérios condicionais de partições fornecidos, os dados pareados de valor-chave de entrada podem ser divididos em três partes com base nos critérios de idade.
Input - Todos os dados em uma coleção de pares de valores-chave.
chave = valor do campo de gênero no registro.
valor = valor de dados de registro inteiro desse gênero.
Method - O processo de lógica de partição é executado da seguinte maneira.
- Leia o valor do campo de idade do par de valores-chave de entrada.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Verifique o valor da idade com as seguintes condições.
- Idade menor ou igual a 20
- Idade maior que 20 e menor ou igual a 30.
- Idade maior que 30.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Todos os dados dos pares de valores-chave são segmentados em três conjuntos de pares de valores-chave. O Redutor funciona individualmente em cada coleção.
Reduzir Tarefas
O número de tarefas do particionador é igual ao número de tarefas do redutor. Aqui, temos três tarefas de particionador e, portanto, temos três tarefas de Redutor a serem executadas.
Input - O Redutor será executado três vezes com coleção diferente de pares de valores-chave.
chave = valor do campo de gênero no registro.
valor = todos os dados de registro desse gênero.
Method - A seguinte lógica será aplicada em cada coleção.
- Leia o valor do campo Salário de cada registro.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Verifique o salário com a variável max. Se str [4] é o salário máximo, atribua str [4] a max, caso contrário, pule a etapa.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Repita as etapas 1 e 2 para cada coleção de chaves (Masculino e Feminino são as coleções de chaves). Depois de executar essas três etapas, você encontrará um salário máximo da coleção de chaves Masculino e um salário máximo da coleção de chaves Feminino.
context.write(new Text(key), new IntWritable(max));
Output- Finalmente, você obterá um conjunto de dados de pares de valores-chave em três coleções de diferentes grupos de idade. Ele contém o salário máximo da coleção Masculino e o salário máximo da coleção Feminino em cada faixa etária, respectivamente.
Depois de executar as tarefas Map, Partitioner e Reduce, as três coleções de dados do par de valores-chave são armazenadas em três arquivos diferentes como saída.
Todas as três tarefas são tratadas como trabalhos MapReduce. Os seguintes requisitos e especificações desses trabalhos devem ser especificados nas Configurações -
- Nome do trabalho
- Formatos de entrada e saída de chaves e valores
- Aulas individuais para tarefas Map, Reduce e Partitioner
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Programa Exemplo
O programa a seguir mostra como implementar os particionadores para os critérios fornecidos em um programa MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Salve o código acima como PartitionerExample.javaem “/ home / hadoop / hadoopPartitioner”. A compilação e execução do programa são fornecidas abaixo.
Compilação e execução
Vamos supor que estejamos no diretório inicial do usuário Hadoop (por exemplo, / home / hadoop).
Siga as etapas fornecidas a seguir para compilar e executar o programa acima.
Step 1- Baixe Hadoop-core-1.2.1.jar, que é usado para compilar e executar o programa MapReduce. Você pode baixar o jar em mvnrepository.com .
Vamos supor que a pasta baixada seja “/ home / hadoop / hadoopPartitioner”
Step 2 - Os seguintes comandos são usados para compilar o programa PartitionerExample.java e criando um jar para o programa.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - Use o seguinte comando para criar um diretório de entrada no HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Use o seguinte comando para copiar o arquivo de entrada chamado input.txt no diretório de entrada do HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Use o seguinte comando para verificar os arquivos no diretório de entrada.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Use o seguinte comando para executar o aplicativo Salário superior, obtendo arquivos de entrada do diretório de entrada.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Espere um pouco até que o arquivo seja executado. Após a execução, a saída contém várias divisões de entrada, tarefas de mapa e tarefas de Redutor.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Use o seguinte comando para verificar os arquivos resultantes na pasta de saída.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Você encontrará a saída em três arquivos porque está usando três particionadores e três Redutores em seu programa.
Step 8 - Use o seguinte comando para ver a saída em Part-00000Arquivo. Este arquivo é gerado pelo HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Use o seguinte comando para ver a saída em Part-00001 Arquivo.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Use o seguinte comando para ver a saída em Part-00002 Arquivo.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000