Alura > Cursos de DevOps > Cursos de Mensageria/Streams > Conteúdos de Mensageria/Streams > Primeiras aulas do curso Kafka e Spring: integrando aplicações e gerenciando fluxos de dados

Kafka e Spring: integrando aplicações e gerenciando fluxos de dados

Kafka com Spring - Apresentação

Olá! Eu sou o Eduardo Santana, sou arquiteto de software, e nesse curso, vou te mostrar alguns conceitos interessantes do Kafka!

Eduardo é um homem branco de cabelo curto preto e barba preta. Ele veste uma camisa preta e está sentado em uma cadeira preta em frente a uma parede cinza iluminada em roxo, com uma estante à sua esquerda e uma planta à sua direita.

O que vamos aprender?

Um dos conceitos que vamos abordar é a conexão de uma aplicação Spring com Kafka. Também veremos como configurar produtor e consumidor, e como fazer outras configurações no Spring, como retentativas.

Em seguida, aprenderemos a processar fluxo de dados com o Kafka Streams. Por fim, conheceremos duas ferramentas muito legais de usar com o Kafka: o Schema Registry e o Kafka Connect.

Pré-requisitos

Um dos pré-requisitos para esse curso é conhecer Spring. Não vamos abordar detalhes do Spring; imaginamos que você, estudante, já sabe construir uma rota, conectar no banco de dados, e assim por diante.

Além disso, é necessário um conhecimento médio/avançado em Kafka. Então, é importante saber o que é um produtor e um consumidor. Vamos mostrar como implementar no Spring, mas não vamos explicar desde o começo o que é um produtor e um consumidor, então assumimos que você já tenha algum conhecimento prévio de Kafka.

Iremos apresentar o curso seguindo sempre o exemplo em que vamos processar um sistema de pagamentos semelhante ao PIX. Uma pessoa usuária irá mandar um pagamento, nós vamos enviar esse pagamento para um tópico do Kafka, e então um consumidor irá processá-lo, para conferir se foi válido ou não.

Vamos começar? Espero você no próximo vídeo!

Kafka com Spring - Integrando o Spring com o Kafka

Nesse vídeo, aprenderemos a conectar uma aplicação Spring com o Kafka.

Nos cursos anteriores, você conheceu algumas bibliotecas para conectar uma aplicação Java com o Kafka. Porém, o Spring tem alguns conceitos bastante conhecidos, como a inversão de controle e a injeção de dependências, que fazem com que, embora seja possível usar essas bibliotecas, não seja o ideal, pois não conseguiremos usar esses conceitos do Spring.

Então, o Spring tem algumas bibliotecas próprias para conectar com o Kafka. É o que veremos ao longo desse curso!

Integrando o Spring com o Kafka

Começaremos rodando a aplicação para mostrar como vamos desenvolvê-la. Ainda não veremos o código, mas sim como ela irá funcionar. Teremos uma rota REST, que veremos no Postman, na qual vamos salvar os pagamentos como se fossem PIX.

Caminho da rota:

http://localhost:8080/pix

Ela terá, basicamente, uma chave de origem (chaveOrigem), que é a chave de pagamento, como funcionam, por exemplo, o CPF e o e-mail; uma chave de destino (chaveDestino), correspondente à chave de quem está sendo pago; e um valor (valor).

{
    "chaveOrigem": "123,
    "chaveDestino": "456",
    "valor": 5000
}

Enviaremos isso para uma rota REST clicando no botão "Send".

Internamente, a rota irá colocar as informações do pagamento em um tópico do Kafka. A partir disso, a nossa outra aplicação, que será o consumidor, vai pegar essa mensagem e processar o PIX.

No Postgres do pgAdmin, vamos salvar as informações do pagamento. Teremos a seguinte tabela:

#id / [PK] integervalor / double precisiondata_transferencia / timestamp without time zonechave_destino / character varying (255)chave_origem / character varying (255)identifier / character varying (255)status / character varying (255)
1250002023-05-26 10:04:15.221051456123aa0a3c1b-53e6-485b-a094-c1c13307f530ERRO

Temos a chave de origem "123" e a chave de destino "456". Na tabela, está indicado o status "ERRO", porque no momento em que o consumidor pega a mensagem para processar, ele valida essas chaves de origem e de destino, para conferir se elas existem. Nesse caso, elas não existiram.

Na nossa aplicação, teremos outra tabela, onde podemos adicionar as chaves configuradas. Vamos configurar as duas chaves que usamos na aplicação e salvar.

#id / [PK] integerchave / character varying (255)
1+2456
2+1123

Agora as chaves existem, então vamos enviar novamente o mesmo PIX no Postman. Feito isso, ao retornar para o pgAdmin, teremos outro registro na tabela, com o status "PROCESSADO":

#id / [PK] integervalor / double precisiondata_transferencia / timestamp without time zonechave_destino / character varying (255)chave_origem / character varying (255)identifier / character varying (255)status / character varying (255)
1250002023-05-26 10:04:15.221051456123aa0a3c1b-53e6-485b-a094-c1c13307f530ERRO
2350002023-05-26 10:05:27.001795456123613e3819-cc4e-4d6f-8ae2-3796fa85fd09PROCESSADO

Essa será a nossa aplicação: temos uma rota REST que recebe as informações de um pagamento; uma aplicação, que é o produtor, vai pegar as informações dessa rota e colocá-las em um tópico no Kafka; por fim, a outra aplicação, que é o consumidor, vai pegar a mensagem do tópico do Kafka e processar o pagamento.

Por que fazemos isso?

Imagine que o fluxo de pagamentos pode crescer muito a depender da época, então se tentarmos processar o pagamento diretamente na rota REST, quando o número aumentar muito (no período do Natal, por exemplo), pode ser que a rota não aguente o fluxo.

Então, em vez de tentar processar na hora, colocamos no tópico e vamos processando conforme a demanda. Assim, nossa aplicação fica muito mais escalável do que se tentarmos processar diretamente na rota.

Kafka com Spring - Implementando o produtor

Sabemos que vamos precisar de algumas bibliotecas específicas do Spring para acessar o Kafka, e já vimos a aplicação que iremos implementar. Agora vamos começar!

Implementando o produto

Com o IntelliJ aberto, vamos entender como é a aplicação Spring. Existem alguns arquivos no nosso projeto, sendo o principal deles o PixController.java, onde estão as rotas REST.

Nós temos somente uma rota, a que salva o PIX (salvarPix()).

@PostMapping
public PixDTO salvarPix(@RequestBody PixDTO pixDTO) {

    pixDTO.setIdentifier(UUID.randomUUID().toString());
    pixDTO.setDataTransferencia(LocalDateTime.now());
    pixDTO.setStatus(PixStatus.EM_PROCESSAMENTO);
    
    return pixService.salvarPix(pixDTO);
}

Nessa rota, recebemos as informações do PIX que queremos enviar, que vimos no Postman no vídeo anterior: a chave de origem, a chave de destino e o valor.

Dentro da rota, definimos o identificador do pagamento (setIdentifier()), a data de transferência (setDataTransferencia()), e o status (setStatus()).

Conforme visto anteriormente no Postman, o status fica EM_PROCESSAMENTO inicialmente.

Além desse arquivo, temos a classe PixDTO.java, que contém as informações do PIX.

@Getter
@Setter
@NoArgsConstructor
public class PixDTO {
    private String identifier;
    private String chaveOrigem;
    private String chaveDestino;
    private Double valor;
    private LocalDateTime dataTransferencia;
    private pixStatus status;

Temos também o status do pix no arquivo PixStatus.java, que pode ser EM_PROCESSAMENTO, PROCESSADO ou ERRO.

public enum PixStatus {
    EM_PROCESSAMENTO, PROCESSADO, ERRO
}

Na pasta "model", temos as duas classes que representam o modelo da nossa aplicação:

Outro arquivo é o PixRepository.java, correspondente à classe do Spring que acessa o banco de dados. Na última pasta, temos o PixService.java, que é a classe principal. Nesse arquivo, por enquanto, salvamos apenas o PIX no banco de dados.

@Autowired 
private final PixRepository pixRepository;

public PixDTO salvarPix(PixDTO pixDTO) {
    pixRepository.save(Pix.toEntity(pixDTO));
    return pixDTO;
}

Nessa primeira implementação, recebemos a informação do PIX e a salvamos no banco de dados usando a variável pixRepository. O que faremos será adicionar o acesso ao Kafka.

Antes de fazer a implementação do acesso ao Kafka, uma coisa importante é saber onde ficam as configurações no Kafka, onde colocamos qual Kafka estamos acessando e outras configurações do tipo. Faremos isso na classe ProducerKafkaConfig, do arquivo ProducerKafkaConfig.java.

Usando a ideia de inversão de controle do Spring, fazemos toda a configuração do acesso ao Kafka na classe com a notação @Configuration.

@Configuration
public class ProducerKafkaConfig {

// Código suprimido

Precisamos de 3 principais configurações no produtor:

@Value(value = "${spring.kafka.bootstrap-servers:localhost:9092}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, PixDTO> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapAddress);
    config.Props.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

É isso que precisamos para configurar o acesso ao Kafka. Agora vamos entender como implementar de fato o acesso!

O processo é simples. Vamos começar acessando o arquivo PixService.java. Assim como temos o acesso ao banco de dados, faremos algo semelhante para acessar o Kafka.

Então, digitaremos a notação @Autowired para fazer a injeção de dependência, outro conceito importante do Spring. Em seguida, vamos usar a combinação de modificadores private final e a classe KafkaTemplate, que é da biblioteca do Kafka no Spring.

Logo depois, vamos incluir entre aspas angulares o tipo da chave, que é String, conforme comentado anteriormente na configuração; e o tipo do valor que vamos enviar para o Kafka, que é PixDTO.

Por fim, criamos o objeto kafkaTemplate.

@Autowired
private final KafkaTemplate<String, PixDTO> kafkaTemplate;

Para enviar mensagens, logo após salvar o PIX no banco de dados (método salvarPix()), vamos digitar kafkaTemplate seguido do método send() e passar para ele o objeto pixDTO.

public PixDTO salvarPix(PixDTO pixDTO) {
    pixRepository.save(Pix.toEntity(pixDTO));
    kafkaTemplate.send(pixDTO);
    return pixDTO;
}

Se quisermos, podemos mandar também a chave. Para isso, passamos para o método send() o pixDTO.getIdentifier(), de modo que a chave do PIX seja o identificador.

public PixDTO salvarPix(PixDTO pixDTO) {
    pixRepository.save(Pix.toEntity(pixDTO));
    kafkaTemplate.send(pixDTO.getIdentifier(), pixDTO);
    return pixDTO;
}

Algo importante que precisamos adicionar é o nome do tópico que queremos mudar. Nesse caso, vamos chamar de pix-topic, mas poderia ser qualquer outro nome de sua preferência.

public PixDTO salvarPix(PixDTO pixDTO) {
    pixRepository.save(Pix.toEntity(pixDTO));
    kafkaTemplate.send(topic: "pix-topic", pixDTO.getIdentifier(), pixDTO);
    return pixDTO;
}

Conclusão

Basicamente, é isso que precisamos fazer para mandar uma mensagem para o Kafka em uma aplicação Spring: usar o KafkaTemplate e usar o método send() em salvarPix().

Pensando em uma aplicação Spring, é muito mais simples usar a biblioteca do Spring. Assim, não precisamos declarar vários clientes do Kafka e configurar isso diretamente. Fizemos a configuração de forma separada no arquivo ProducerKafkaConfig.java, e somente com uma linha já conseguimos mandar a mensagem para o Kafka.

O produtor está pronto e podemos mandar os PIX para a fila do Kafka!

Sobre o curso Kafka e Spring: integrando aplicações e gerenciando fluxos de dados

O curso Kafka e Spring: integrando aplicações e gerenciando fluxos de dados possui 110 minutos de vídeos, em um total de 45 atividades. Gostou? Conheça nossos outros cursos de Mensageria/Streams em DevOps, ou leia nossos artigos de DevOps.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Mensageria/Streams acessando integralmente esse e outros cursos, comece hoje!

Conheça os Planos para Empresas