MapReduce - Particionador

Um particionador funciona como uma condição no processamento de um conjunto de dados de entrada. A fase de partição ocorre após a fase de Mapa e antes da fase de Redução.

O número de particionadores é igual ao número de redutores. Isso significa que um particionador dividirá os dados de acordo com o número de redutores. Portanto, os dados passados ​​de um único particionador são processados ​​por um único Redutor.

Particionador

Um particionador particiona os pares chave-valor de saídas Map intermediárias. Ele particiona os dados usando uma condição definida pelo usuário, que funciona como uma função hash. O número total de partições é igual ao número de tarefas do Redutor para o trabalho. Vamos dar um exemplo para entender como funciona o particionador.

Implementação do Particionador MapReduce

Por uma questão de conveniência, vamos supor que temos uma pequena tabela chamada Funcionário com os seguintes dados. Usaremos esses dados de amostra como nosso conjunto de dados de entrada para demonstrar como o particionador funciona.

Eu iria Nome Era Gênero Salário
1201 gopal 45 Masculino 50.000
1202 manisha 40 Fêmea 50.000
1203 Khalil 34 Masculino 30.000
1204 prasanth 30 Masculino 30.000
1205 Kiran 20 Masculino 40.000
1206 laxmi 25 Fêmea 35.000
1207 bhavya 20 Fêmea 15.000
1208 reshma 19 Fêmea 15.000
1209 Kranthi 22 Masculino 22.000
1210 Satish 24 Masculino 25.000
1211 Krishna 25 Masculino 25.000
1212 Arshad 28 Masculino 20.000
1213 Lavanya 18 Fêmea 8.000

Temos que escrever um aplicativo para processar o conjunto de dados de entrada para encontrar o funcionário mais bem remunerado por gênero em diferentes grupos de idade (por exemplo, abaixo de 20, entre 21 a 30, acima de 30).

Dados de entrada

Os dados acima são salvos como input.txt no diretório “/ home / hadoop / hadoopPartitioner” e fornecido como entrada.

1201 gopal 45 Masculino 50000
1202 manisha 40 Fêmea 51000
1203 khaleel 34 Masculino 30000
1204 prasanth 30 Masculino 31.000
1205 Kiran 20 Masculino 40.000
1206 laxmi 25 Fêmea 35000
1207 bhavya 20 Fêmea 15.000
1208 reshma 19 Fêmea 14000
1209 Kranthi 22 Masculino 22000
1210 Satish 24 Masculino 25000
1211 Krishna 25 Masculino 26000
1212 Arshad 28 Masculino 20.000
1213 Lavanya 18 Fêmea 8000

Com base na entrada fornecida, a seguir está a explicação algorítmica do programa.

Tarefas de mapa

A tarefa de mapa aceita os pares de valores-chave como entrada enquanto temos os dados de texto em um arquivo de texto. A entrada para esta tarefa de mapa é a seguinte -

Input - A chave seria um padrão como “qualquer tecla especial + nome do arquivo + número da linha” (exemplo: chave = @ input1) e o valor seria os dados dessa linha (exemplo: valor = 1201 \ t gopal \ t 45 \ t Masculino \ t 50000).

Method - A operação desta tarefa de mapa é a seguinte -

  • Leia o value (dados de registro), que vem como valor de entrada da lista de argumentos em uma string.

  • Usando a função de divisão, separe o gênero e armazene em uma variável de string.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Envie as informações de gênero e os dados do registro value como par de valores-chave de saída da tarefa de mapa para o partition task.

context.write(new Text(gender), new Text(value));
  • Repita todas as etapas acima para todos os registros no arquivo de texto.

Output - Você obterá os dados de gênero e o valor dos dados de registro como pares de valores-chave.

Tarefa do Particionador

A tarefa do particionador aceita os pares de valores-chave da tarefa de mapa como sua entrada. A partição implica em dividir os dados em segmentos. De acordo com os critérios condicionais de partições fornecidos, os dados pareados de valor-chave de entrada podem ser divididos em três partes com base nos critérios de idade.

Input - Todos os dados em uma coleção de pares de valores-chave.

chave = valor do campo de gênero no registro.

valor = valor de dados de registro inteiro desse gênero.

Method - O processo de lógica de partição é executado da seguinte maneira.

  • Leia o valor do campo de idade do par de valores-chave de entrada.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Verifique o valor da idade com as seguintes condições.

    • Idade menor ou igual a 20
    • Idade maior que 20 e menor ou igual a 30.
    • Idade maior que 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Todos os dados dos pares de valores-chave são segmentados em três conjuntos de pares de valores-chave. O Redutor funciona individualmente em cada coleção.

Reduzir Tarefas

O número de tarefas do particionador é igual ao número de tarefas do redutor. Aqui, temos três tarefas de particionador e, portanto, temos três tarefas de Redutor a serem executadas.

Input - O Redutor será executado três vezes com coleção diferente de pares de valores-chave.

chave = valor do campo de gênero no registro.

valor = todos os dados de registro desse gênero.

Method - A seguinte lógica será aplicada em cada coleção.

  • Leia o valor do campo Salário de cada registro.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Verifique o salário com a variável max. Se str [4] é o salário máximo, atribua str [4] a max, caso contrário, pule a etapa.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Repita as etapas 1 e 2 para cada coleção de chaves (Masculino e Feminino são as coleções de chaves). Depois de executar essas três etapas, você encontrará um salário máximo da coleção de chaves Masculino e um salário máximo da coleção de chaves Feminino.

context.write(new Text(key), new IntWritable(max));

Output- Finalmente, você obterá um conjunto de dados de pares de valores-chave em três coleções de diferentes grupos de idade. Ele contém o salário máximo da coleção Masculino e o salário máximo da coleção Feminino em cada faixa etária, respectivamente.

Depois de executar as tarefas Map, Partitioner e Reduce, as três coleções de dados do par de valores-chave são armazenadas em três arquivos diferentes como saída.

Todas as três tarefas são tratadas como trabalhos MapReduce. Os seguintes requisitos e especificações desses trabalhos devem ser especificados nas Configurações -

  • Nome do trabalho
  • Formatos de entrada e saída de chaves e valores
  • Aulas individuais para tarefas Map, Reduce e Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Programa Exemplo

O programa a seguir mostra como implementar os particionadores para os critérios fornecidos em um programa MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Salve o código acima como PartitionerExample.javaem “/ home / hadoop / hadoopPartitioner”. A compilação e execução do programa são fornecidas abaixo.

Compilação e execução

Vamos supor que estejamos no diretório inicial do usuário Hadoop (por exemplo, / home / hadoop).

Siga as etapas fornecidas a seguir para compilar e executar o programa acima.

Step 1- Baixe Hadoop-core-1.2.1.jar, que é usado para compilar e executar o programa MapReduce. Você pode baixar o jar em mvnrepository.com .

Vamos supor que a pasta baixada seja “/ home / hadoop / hadoopPartitioner”

Step 2 - Os seguintes comandos são usados ​​para compilar o programa PartitionerExample.java e criando um jar para o programa.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Use o seguinte comando para criar um diretório de entrada no HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Use o seguinte comando para copiar o arquivo de entrada chamado input.txt no diretório de entrada do HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Use o seguinte comando para verificar os arquivos no diretório de entrada.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Use o seguinte comando para executar o aplicativo Salário superior, obtendo arquivos de entrada do diretório de entrada.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Espere um pouco até que o arquivo seja executado. Após a execução, a saída contém várias divisões de entrada, tarefas de mapa e tarefas de Redutor.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Use o seguinte comando para verificar os arquivos resultantes na pasta de saída.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Você encontrará a saída em três arquivos porque está usando três particionadores e três Redutores em seu programa.

Step 8 - Use o seguinte comando para ver a saída em Part-00000Arquivo. Este arquivo é gerado pelo HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Use o seguinte comando para ver a saída em Part-00001 Arquivo.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Use o seguinte comando para ver a saída em Part-00002 Arquivo.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000