Alura > Cursos de DevOps > Cursos de Mensageria/Streams > Conteúdos de Mensageria/Streams > Primeiras aulas do curso Kafka: Batches, correlation ids e dead letters

Kafka: Batches, correlation ids e dead letters

Batch - Introdução

Bem-vindos a mais um curso de Kafka. Nele, vamos ver diversas situações do dia a dia onde as falhas ocorrem. Vamos ter serviços enviando mensagens para diversos lugares diferentes ou diversas mensagens baseadas numa única mensagem, e erros podem ocorrer nesse processo. Como lidar com essas situações? Com o erro na recuperação de um erro? Vamos ver como trabalhar de novo com rebalanceamento, partições, líderes, réplicas, tudo isso em um volume maior agora. Tudo isso, conseguindo trackear o processo de como uma mensagem no começo do serviço gera um fluxo de mensagens até onde de repente consigo trackear de volta onde gerou o erro. Isso tudo é fundamental para termos uma camada própria em cima do Kafka e possamos trabalhar com isso de uma maneira transparente, benéfica para os nossos projetos.

Batch - Simulando a geração de relatórios

Vamos carregar o projeto que terminamos no último curso. Se você não fez, não se preocupe, durante as atividades você vai baixar esse código. Claro, assumindo que você já conhece o conteúdo. Se não conhece, faça os dois primeiros cursos e continue aqui.

Tenho vários pequenos projetos no meu sistema de e-commerce. Até agora, tudo que fizemos tem se baseado numa requisição que executa uma tarefa para um usuário. Até fizemos no servidor http uma classe capaz de trabalhar com uma requisição que cria uma nova compra.

Eu queria fazer um processo que executa várias tarefas de uma vez só. Uma coisa que podemos fazer é o relatório mensal que envio para os meus usuários. Se o e-commerce é de PDF de livros online, eu poderia gerar um relatório de leitura mensal. Se o usuário quiser, ele pode pedir esse relatório, e no final do mês geramos para todos.

Eu queria ver o processo de batch. Como executo vários de uma vez. O quão diferente seria. Vai ser bem simples. Primeiro, quero gerar um relatório de estudo. Vou ter um serviço novo. Crio em novo módulo. Meu padrão é serviço alguma coisa. Tenho meu relatório de leitura, que é o reading report. Dou next, crio meu projeto. Dentro, posso criar meu serviço, que recebe uma mensagem dizendo querer um relatório para aquele usuário.

Como outros serviços que temos. Eu poderia usar um deles como base. Vou usar o fraude detector service. Ele usa o pedido de compra para detectar se tem fraude. Vou copiar o pacote inteiro. Dentro, vou ter um reflector e o meu reading report service.

Primeiro, criamos, instanciamos esse objeto para inserir o report services. Tem que importar o projeto, adicionar dependência. Ele vai estar escutando pedidos de relatório. É user reading report. Agora é um pedido de geração. Vamos só receber os dados. Depois, gerar um relatório e fazer algo.

O relatório está ligado a um usuário. Temos a classe user? Ainda não. Vou receber um. A classe está ligada a cada projeto. Esse user é desse projeto. Preciso aqui o user, em que vamos definir o id ou e-mail. Vou deixar apresentado como string.

Gostamos de simular essas coisas, para ver. Eu queria pegar um arquivo que tivesse o modelo do meu txt do meu relatório. Dentro do Resources, vou criar um novo arquivo. Seria informações básicas.

Vou pegar esse modelo, esse arquivo, que é um new file. Vou usar o path relativo para buscar. Vou chamar isso de source. Quero copiar esse arquivo para algum diretório.

Se eu dou enter, não existe o IO. Vou ter que criar em outro arquivo. Ele é baseado no usuário. O path para esse cara vai ser simples, vai ser o target, um arquivo com uuid mais report txt.

Quero copiar para IO. Ele recebe um path, um target. Quero garantir que esse diretório existe. Vou pegar o diretório pai e vou pegar os diretórios necessários. Depois, digo para copiar do sourcing para o target. Só que se já existir, vou pedir para sobrescrever.

Agora quero dar um append nesse arquivo. Quero adicionar uma linha. Falo files.write. Nesse arquivo, vou escrever o conteúdo e falo que quero adicionar o append. Seria o processo de gerar o relatório que você quer gerar.

Terminamos. Conseguimos colocar o target. Terminei de gerar o relatório, quero notificar os usuários. Eu poderia enviar um e-mail com esse relatório. Podemos fazer um processo.

Conseguimos ler e enxergar algo em disco. O pedido pode vir de um usuário só, mas eu queria fazer o patch. Vamos ter que fazer isso de alguma maneira.

Agora vamos precisar de outro serviço, capaz de gerar as mil mensagens. Vou fazer isso no http. Vou assumir que teremos uma parte de admin. Essa parte deve ficar onde? Não tem resposta exata. No nosso caso, vamos manter no mesmo servidor http.

Batch - Generalização de processo de batch assíncrono e http fast delegate

Nossos próximo passo é ser capaz de receber uma requisição http que gera todos os relatórios. Vou dar um copypaste na servlet. Ao invés do new order servlet, vamos ter um generate all reports servlet. E esse vai ser o admin generate.

Precisamos criar essa servlet. É o mesmo esquema que o new order servlet. Vou um copypaste. Só mandamos o usuário que vamos gerar, então é um user dispatcher. Precisamos puxar a classe user. Nesse outro projeto ela só precisa do id.

No doget, não vou ler parâmetro. Quero fazer um for para cada usuário dentro de todos os usuários. Vamos no nosso reading report services e vamos mandar nesse tópico o nosso usuário. Posso enviar o id dele, porque assim garanto que se tiver dois usuários com o mesmo id, eu ia executar um depois do outro. Mas se um usuário pedir três relatórios, eu executo o segundo relatório daquele usuário só depois de terminar o primeiro, assim um não tem preferência em relação ao outro sem querer.

Só falta ter acesso a todos os usuários. É verdade. Só que lembram que lá atrás eu falei de uma sacada do http que é importante? Você quer delegar para alguém, porque se de repente o servidor cai no meio, você gerou metade só. E não tem nem como controlar, porque a pessoa vai atualizar na tela dela. Enquanto na mensagem não. Como nós vamos gerar, temos mais controle.

O que fazer quando temos um serviço de http? Damos a resposta o mais rápido possível. Delega esse trabalho para alguém e dá uma resposta. Nós temos algumas abordagens. Uma seria já dar o ok e disparar uma única mensagem falando gere todos os relatórios. É o caminho.

Alguém vai escutar isso. Vai pegar todos os usuários e para cada usuário disparar a mensagem. Como nosso usuário final já sabe que o processo está sendo executado, não vai dar F5. Claro que o serviço pode parar no meio, e vamos lidar com isso de diversas maneiras. O que eu queria parar para pensar é quem vai ficar escutando as mensagens? Em qual desses projetos devemos colocar alguém escutando?

Poderíamos colocar no service reading report. Podemos colocar alguém que se chama generate all reports service, que vai escutar essa única mensagem e fazer o for para todos os usuários. Minha pergunta é se esse serviço tem acesso ao id de todos os usuários. Não tem. Como faço para ele ter?

Ou esse serviço pergunta para outro, faz uma requisito http, demora, mas recebe. É uma abordagem. Outro caminho é perguntar para outro serviço quais são os ids, só de maneira assíncrona ficaria esperando. Ele poderia enfiar a mão no banco de dados desse outro serviço create users. É válido. É feio, porque você criou uma ligação que não é mais só o esquema e semântica do tópico das mensagens. O que liga os serviços hoje é a semântica do tópico das mensagens, o que significa o tópico. A chave, que vai dizer se ele está em paralelo ou se é alisado. E o esquema da mensagem. Se eu fizer isso, estou ligando atrás do banco de dados. Aí volto a trabalhar naquela história de todos acessarem o mesmo bando e seus problemas.

Ou eu poderia eu mesmo ter uma lista desses usuários. Toda vez que um novo usuário é criado, disparo uma mensagem dizendo que temos um usuário novo, com o id do usuário. Se eu tenho um serviço que quer manter atualizados todos os ids dos usuários, não tem problema. Você entraria em um problema de réplica se precisar apagar um id. Mas repare que o vínculo dos serviços passou a ser também o que eles estão interessados e outras coisas do gênero. É uma abordagem.

Vamos para outra. A quarta abordagem é fazer com que nosso generate mando uma mensagem que diz que para todos os usuários quero executar uma mensagem. Mando uma mensagem do tipo taskdispatcher, que vai ser executada para todos os usuários. Posso ter um dispatcher que vai executar algo para todos os usuários. Ele vai pegar e enviar uma mensagem para todos.

Eu vou fazer dessa maneira, para vermos tudo feito por mensagens. O tópico da mensagem vai ser o user generate read report. Tenho um único serviço que escuta isso, faz o for para cada usuário e para cada um envia essa mensagem. Se você tiver que gerar outra tarefa para todos os usuários, é só chamar esse aqui e a mensagem que você quer executar.

Dessa maneira, não mantive uma réplica de todos os ids comigo e não perguntei de maneira assíncrona perguntar para todos os usuários. Está sendo de maneira síncrona. Não vou perguntar, vou notificar.

Vamos precisar do dispatcher. Repare que não é mais de usuário. É simples, que manda o tópico. Vou chamar de batch dispatcher, que simplesmente executa para todo mundo. A chave vou usar a mesma, porque só tem uma.

Preciso em algum serviço que tem acesso ao banco receber essa mensagem. Posso criar um novo serviço aqui dentro. Esse cara vai precisar abrir uma conexão com o banco, do método main, do parse. No método main, ele cria o serviço.

Temos que tomar cuidado com o tipo de classe que ele recebe. Não iremos trabalhar com o usuário, mas com uma string. Quem também recebe string é o de e-mail. Quando eu receber essa mensagem quer dizer processem new batch. O batch é para o tópico. O tópico para o qual vou mandar tudo. Tiro o for dali e jogo em outro lugar. Para cada usuário vou enviar essa mensagem agora.

O fraude detector é um serviço que tem um dispatcher. A gente simplesmente criou um e usamos. É a mesma coisa. Ele vai para cada usuário, pega o id e manda gerar o relatório. Tínhamos colocado o user no http e-commerce, mas precisamos dele aqui no nosso service users.

Faltou pegar todos os usuários. Preciso de um getallusers. Fazemos isso da mesma forma que com o banco. Ele vai passar por cada um dos usuários. Só para lembrar, o users tem um campo chamado uuid. Então, seleciono uuid dos usuários. Só isso. Isso devolve para nós os results.

Vou ter uma lista de usuários. Poderia usar a lista de string? Poderia, você não precisaria de um modelo, funcionaria. A sacada foi que o http e-commerce service tem uma requisição possível que bate e envia diretamente o fast delegate. Para cada um dos usuários, quero que você mande a mensagem. Ele vai fazer um for para cada um e um send message. No body da mensagem quem vai estar é o próprio usuário. Ele vai invocar isso mil vezes, cem vezes, e para cada ele vai executar o gerador de relatório.

Sobre o curso Kafka: Batches, correlation ids e dead letters

O curso Kafka: Batches, correlation ids e dead letters possui 135 minutos de vídeos, em um total de 28 atividades. Gostou? Conheça nossos outros cursos de Mensageria/Streams em DevOps, ou leia nossos artigos de DevOps.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Mensageria/Streams acessando integralmente esse e outros cursos, comece hoje!

Conheça os Planos para Empresas