Apache Kafka - Integração com Storm

Neste capítulo, aprenderemos como integrar o Kafka ao Apache Storm.

Sobre Storm

Storm foi originalmente criado por Nathan Marz e equipe da BackType. Em pouco tempo, o Apache Storm se tornou um padrão para sistema de processamento distribuído em tempo real que permite processar um grande volume de dados. Storm é muito rápido e um benchmark o registrou em mais de um milhão de tuplas processadas por segundo por nó. O Apache Storm é executado continuamente, consumindo dados das fontes configuradas (Spouts) e passa os dados pelo pipeline de processamento (Bolts). Combinados, bicos e parafusos formam uma topologia.

Integração com Storm

Kafka e Storm se complementam naturalmente, e sua poderosa cooperação permite análises de streaming em tempo real para big data em movimento rápido. A integração do Kafka com o Storm é para tornar mais fácil para os desenvolvedores ingerir e publicar fluxos de dados de topologias Storm.

Fluxo conceitual

Um bico é uma fonte de riachos. Por exemplo, um spout pode ler tuplas de um Tópico Kafka e emiti-las como um fluxo. Um bolt consome fluxos de entrada, processa e possivelmente emite novos fluxos. Bolts podem fazer qualquer coisa, desde executar funções, filtrar tuplas, fazer agregações de streaming, joins de streaming, conversar com bancos de dados e muito mais. Cada nó em uma topologia Storm é executado em paralelo. Uma topologia é executada indefinidamente até que você a encerre. Storm irá reatribuir automaticamente quaisquer tarefas com falha. Além disso, Storm garante que não haverá perda de dados, mesmo que as máquinas parem e as mensagens sejam descartadas.

Vamos examinar as APIs de integração do Kafka-Storm em detalhes. Existem três classes principais para integrar o Kafka com o Storm. Eles são os seguintes -

BrokerHosts - ZkHosts e StaticHosts

BrokerHosts é uma interface e ZkHosts e StaticHosts são suas duas principais implementações. ZkHosts é usado para rastrear os corretores Kafka dinamicamente, mantendo os detalhes no ZooKeeper, enquanto StaticHosts é usado para definir manualmente / estaticamente os corretores Kafka e seus detalhes. ZkHosts é a maneira simples e rápida de acessar o corretor Kafka.

A assinatura da ZkHosts é a seguinte -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Onde brokerZkStr é o host do ZooKeeper e brokerZkPath é o caminho do ZooKeeper para manter os detalhes do corretor Kafka.

API KafkaConfig

Esta API é usada para definir as configurações do cluster Kafka. A assinatura de Kafka Con-fig é definida como segue

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - Os BrokerHosts podem ser ZkHosts / StaticHosts.

    Topic - nome do tópico.

API SpoutConfig

Spoutconfig é uma extensão do KafkaConfig que suporta informações adicionais do ZooKeeper.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - O BrokerHosts pode ser qualquer implementação da interface BrokerHosts

  • Topic - nome do tópico.

  • zkRoot - Caminho de raiz do ZooKeeper.

  • id −O bico armazena o estado dos offsets consumidos no Zookeeper. O id deve identificar exclusivamente o seu bico.

SchemeAsMultiScheme

SchemeAsMultiScheme é uma interface que determina como o ByteBuffer consumido de Kafka é transformado em uma tupla de tempestade. É derivado de MultiScheme e aceita a implementação da classe Scheme. Há muitas implementações da classe Scheme e uma dessas implementações é StringScheme, que analisa o byte como uma string simples. Ele também controla a nomenclatura do seu campo de saída. A assinatura é definida da seguinte forma.

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - buffer de byte consumido do kafka.

API KafkaSpout

KafkaSpout é nossa implementação de spout, que se integrará ao Storm. Ele busca as mensagens do tópico kafka e as emite no ecossistema Storm como tuplas. O KafkaSpout obtém seus detalhes de configuração em SpoutConfig.

Abaixo está um exemplo de código para criar um bico Kafka simples.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Criação de Parafuso

Bolt é um componente que recebe tuplas como entrada, processa a tupla e produz novas tuplas como saída. Bolts implementará a interface IRichBolt. Neste programa, duas classes de bolt WordSplitter-Bolt e WordCounterBolt são usadas para realizar as operações.

A interface IRichBolt tem os seguintes métodos -

  • Prepare- Fornece ao parafuso um ambiente para execução. Os executores irão executar este método para inicializar o spout.

  • Execute - Processa uma única tupla de entrada.

  • Cleanup - Chamado quando um parafuso vai desligar.

  • declareOutputFields - Declara o esquema de saída da tupla.

Vamos criar SplitBolt.java, que implementa a lógica para dividir uma frase em palavras e CountBolt.java, que implementa lógica para separar palavras únicas e contar sua ocorrência.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Enviando para topologia

A topologia Storm é basicamente uma estrutura Thrift. A classe TopologyBuilder fornece métodos simples e fáceis para criar topologias complexas. A classe TopologyBuilder possui métodos para definir spout (setSpout) e para definir bolt (setBolt). Finalmente, TopologyBuilder tem createTopology para criar to-pology. Os métodos shuffleGrouping e fieldsGrouping ajudam a definir o agrupamento de fluxos para spout e bolts.

Local Cluster- Para fins de desenvolvimento, podemos criar um cluster local usando LocalCluster objeto e, em seguida, enviar a topologia usando submitTopology método de LocalCluster classe.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Antes de mover a compilação, a integração Kakfa-Storm precisa da biblioteca Java do curador ZooKeeper. O Curator versão 2.9.1 oferece suporte ao Apache Storm versão 0.9.5 (que usamos neste tutorial). Baixe os arquivos jar especificados abaixo e coloque-os no caminho de classe java.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

Depois de incluir os arquivos de dependência, compile o programa usando o seguinte comando,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Execução

Inicie a CLI do Kafka Producer (explicado no capítulo anterior), crie um novo tópico chamado my-first-topic e forneça alguns exemplos de mensagens, conforme mostrado abaixo -

hello
kafka
storm
spark
test message
another test message

Agora execute o aplicativo usando o seguinte comando -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

O exemplo de saída deste aplicativo é especificado abaixo -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2