Entre para a LISTA VIP da Black Friday

00

DIAS

00

HORAS

00

MIN

00

SEG

Clique para saber mais
Alura > Cursos de DevOps > Cursos de Mensageria/Streams > Conteúdos de Mensageria/Streams > Primeiras aulas do curso Kafka: Fast delegate, evolução e cluster de brokers

Kafka: Fast delegate, evolução e cluster de brokers

Novos produtores e consumidores - Introdução

Boas-vindas a mais um curso de Kafka. Neste curso, vamos criar diversos serviços e extrair alguns para trabalhar em um ambiente onde a arquitetura é distribuída.

O que aprenderemos?

Temos cada um desses serviços que podem rodar na mesma máquina ou em máquinas distintas. Cada um deles pode rodar uma ou mais vezes em paralelo, permitindo paralelizar muito do nosso trabalho. No entanto, também surgirão questões em momentos onde queiramos serializar o processamento. E também faremos isso.

Vamos entender um ponto de falha do Kafka quando temos um único broker. Podemos vencer isso através de um cluster de brokers. Ou seja, não só nossos serviços são rodados diversas vezes em um cluster, mas também os brokers do Kafka.

Para isso será importante entendermos como um tópico é armazenado dentro do Kafka e como podemos fazer a replicação desse tópico baseada nas informações que uma liderança possui.

A liderança recebe as informações de escrita e replica isso para suas réplicas, que vão estar em sincronia com a liderança. Quando líderes e réplicas caem, isso nos permite ter mais garantias de que as informações estão armazenadas e serão consumidas em algum momento. Isso é, temos mais reliability (confiabilidade).

Vamos aumentar a confiabilidade, ao mesmo tempo da paralelização, e, ao mesmo tempo, quando quisermos, da serialização. Elas se tornarão uma sequência, uma atrás da outra.

Tudo isso vamos conseguir ganhar simultaneamente à medida que configuramos o Kafka, pensando em nossa arquitetura, no miolo dos nossos serviços — seja na hora que recebemos uma requisição HTTP que vamos implementar, seja na hora que escrevemos em um banco de dados que vamos implementar.

Pré-requisitos

Vamos ver tudo isso acontecendo neste curso. Se você não fez nosso primeiro curso de Kafka, sugerimos que o faça antes ou verifique se já possui o conhecimento que ele aborda.

Além disso, indicamos que baixe o conteúdo do código no Github do primeiro curso.

Vamos começar?

Novos produtores e consumidores - Produtores consumidores e o eager de patterns

Vamos começar. Vamos prosseguir com o projeto do outro curso. Se já concluiu todo o outro curso, terá o mesmo código que nós. Caso contrário, pode baixar do GitHub conforme os links disponibilizados.

Abriremos o projeto no IntelliJ, clicando em "Open" (abrir). Na janela do explorador, escolheremos o diretório "ecommerce" onde o projeto está localizado e pressionaremos o botão "Open". Com isso, teremos o projeto carregado na IDE, com a versão final do último curso.

Este projeto, apenas para lembrar, é um sistema de e-commerce que está simulando o final do processo de venda. A pessoa clicou e gerou uma venda. Após a geração da venda, o que faremos?

Temos um serviço que gera 10 vendas para testarmos. Essas 10 vendas geram 20 mensagens, 10 de e-mail e 10 de novos pedidos de venda. Esses pedidos de venda passam pelo detector de fraude "service-fraud-detector" e os pedidos de e-mail vão para o "service-email". Todas as mensagens são registradas no "service-log".

Repare que temos serviços enviando e serviços escutando mensagens. Queremos mostrar que não tem desafio e misturar as duas coisas, ou seja, enviar e receber mensagens no mesmo serviço,

Criando um serviço que produz e consome

Vamos acessar o diretório do serviço de detecção de fraude que recebe a mensagem de um novo pedido, certo? Vamos acessar o código dele no arquivo FraudDetectorService por meio do caminho de pastas "ecommerce > service-fraud-detector > src > main > java > br.com.alura.ecommerce".

Ele recebe uma mensagem de novo pedido, executa o código em seu interior, aguarda cinco segundos e então informa que a ordem foi processada. Se deu certo ou não, não importa.

Gostaríamos de simular uma situação com alguns casos em que nosso pedido foi aprovado e em outros não, pois houve uma fraude detectada. Vamos dar uma olhada?

Devemos lembrar que quando geramos os pedidos de compra, o valor varia de 1 a 5 mil, mais ou menos. Poderíamos fazer com que pedidos cujo valor for acima de 4.500 sejam recusados.

Claro, na prática, se estamos usando Machine Learning, inteligência artificial, estatística, ou outro algoritmo, deixaremos o estimador nesse serviço — a equipe de Machine Learning implementa e deixa ali. No nosso caso, faremos uma regra simples para tentar detectar a suposta fraude. Nesse caso, se o pedido for muito caro, entrará no quadro de fraude.

Essa detecção de fraude não é uma coisa do mundo real, pois esse não é o foco do curso. Temos outros cursos nos quais o foco é Machine Learning, aprendizado de máquina, detecção de fraude e várias outras coisas.

Abaixo do bloco no qual esperarmos os 5 segundos para simular o algoritmo lento, queremos acessar essa order, que está no record. Podemos pegar o record, cujo value devolve uma order para nós. Isso é a nossa order, certo? Portanto, vamos adicionar um var order e um sinal de igual à direita do código que implementamos.

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for fraud");
        System.out.println(record.key());
        System.out.println(record.value());
        System.out.println(record.partition());
        System.out.println(record.offset());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // ignoring
            e.printStackTrace();
        }
        var order = record.value();
        
        System.out.println("Order processed");
    }
    
}

E agora, podemos fazer o que quisermos com essa order. Contudo, se acessarmos o arquivo Order, na mesma pasta, veremos que não há nenhum order.get nesse modelo que permita acessar o valor dela.

Poderíamos pensar em colocar o get do amount e deixar este último. Pode ser, não tem problema nenhum. Ou poderíamos implementar métodos dentro dele, do tipo "verifica se é fraude". Dessa forma, esse modelo deixa de ser anêmico, tudo bem? Também não tem problema. Contudo, nesse caso vamos criar um getter.

Tem várias maneiras de criar um getter com o IntelliJ. Cada ferramenta e linguagem tem algo do gênero. No nosso caso, digitando get abaixo do bloco public Order, a IDE nos mostra uma caixa de sugestões, na qual podemos selecionar "public BigDecimal getAmount()", para gerar automaticamente a estrutura do get amount.

Poderia até deixar esse cara como público, já que o BigDecimal é imutável e ele é final, não teria esse problema, mas isso não é considerado uma boa prática em Java, então geramos um getter só para aquilo que precisamos.

public class Order {

    private final String userId, orderId;
    private final BigDecimal amount;

    public Order(String userId, String orderId, BigDecimal amount) {
        this.userId = userId;
        this.orderId = orderId;
        this.amount = amount;
    }
    
    public BigDecimal getAmount() {
        return amount;
    }
}

Voltando ao FraudDetectorService, abaixo da var order, adicionaremos um if(). Entre os parênteses, queremos comparar esse getAmount() para saber se ele é maior ou menor. Não temos greater ou lesser, mas no BigDecimal temos o compareTo(). Entre os parênteses, passamos um new BigDecimal("4500").

Passamos o valor 4500 com aspas, porque assim temos a precisão de que é 4.500 exatamente. Se isso daí for maior ou igual a zero, ou seja, >=0, quer dizer que o preço é muito alto, é maior que 4.500.

Esse trecho está fingindo que a fraude acontece quando o valor é maior ou igual a 4.500. Se estamos anotando os comentários em inglês, vamos escrever // pretending that the fraud happens when the amount is >= 4500 entre as chaves do if.

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        // Código omitido
        
        }
        var order = record.value();
        if(order.getAmount().compareTo(new BigDecimal("4500")) >= 0) {
            // pretending that the fraud happens when the amount is >= 4500
        }
        
        System.out.println("Order processed");
    }

}

Mesmo que a lógica seja super simples, é interessante extrair isso no método ou em outra coisa. No IntelliJ, acessaremos a aba de menus superior e clicaremos em "Refactor > Extract > Method" ou usaremos o atalho "Ctrl+Alt+M".

Queremos saber se é uma fraude, então, escreveremos isFraud no campo "Name" da janela "Extract Method" e pressionaremos "Enter". Isso substituirá o conteúdo entre os parênteses do if para isFraud(order) e gerará a estrutura do método private boolean isFraud() abaixo do bloco private void parse().

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        // Código omitido
        
        }
        var order = record.value();
        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
        }
        
        System.out.println("Order processed");
    }
    
    private boolean isFraud (Order order) {
        return order.getAmount().compareTo(new BigDecimal( val: "4500")) >= 0;
    }
}

Podemos implementar esse método de fraude aqui no FraudDetectorService, ou nas classes que fizerem sentido para esse serviço.

Se for uma fraude, em vez de colocar o println() de "OrderProcessed", vamos colocar um "OrderIsFraud". Para isso, vamos recortar o println no fim do código e colar entre as chaves do if, alterando a string de "Order processed" para "Order is a fraud!!!!!".

Se ela não for uma fraude, criaremos um else para dizer que ela não é uma fraude, com um println() da mensagem "Approved: " junto à order aprovada.

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        // Código omitido
        
        }
        var order = record.value();
        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
            System.out.println("Order is a fraud!!!!!");
        } else {
            System.out.println("Approved: " + order);
        }
        

    }
    
    private boolean isFraud (Order order) {
        return order.getAmount().compareTo(new BigDecimal( val: "4500")) >= 0;
    }
}

Temos essas duas maneiras de mostrar.

Repare que agora a classe Order do detector de fraude é diferente da classe Order que envia. Não tem problema, porque o processo de serialização e desserialização só usa os campos.

Conforme falamos no outro curso, o processo de serialização e desserialização pode ser feito de diversas maneiras. Da maneira que estamos fazendo agora, se um serviço precisa de certos métodos e o outro de outro, não tem problema nenhum. Você não precisa colocar as dependências nos dois, misturando e "sujando" um projeto com coisas do outro. Eles estão isolados.

Temos um Getter, que é um método simples e ainda continua anêmico. Mas poderíamos colocar outros métodos, não tem problema.

Vamos ver isso acontecer de verdade? Vamos rodar o nosso FraudDetectorService, clicando dentro do seu código com o botão direito e selecionando "Run 'FraudDetectorService'". Como estará rodando pela primeira vez, uma janela de configurações será exibida, na qual clicaremos no botão "Run".

Ele exibirá uma janela de diálogo, pois está com a configuração errada desde a última vez que rodamos. Mas vamos continuar clicando em "Continue Anyway" e ver se ele vai executar mesmo assim.

Após a execução, acessaremos o arquivo NewOrderMain e tentar rodá-lo também com o atalho "Ctrl+Shift+R". Na tela de configuração exibida, podemos ver no campo "Working directory" que o diretório estava errado e agora está no projeto novo. Mas tudo bem, podemos deixar ele rodar clicando em "Run" e em "Continue Anyway".

Em seguida, vamos rodar. Voltando ao arquivo FraudDetector, vamos clicar em "Ctrl+Shift+R" para rodar. Ele exibirá a janela de configurações na qual está falando que tem um erro no módulo, que não estamos especificando. Vamos encontrá-lo no campo "Use classpath of module", no qual devemos clicar e escolher o módulo "service-fraud-detector". Por fim, clicaremos no botão "Run".

Com isso, o FraudDetector está rodando. Faremos o mesmo processo no arquivo NewOrderMain. Ele deve reclamar também do módulo do campo "Use classpath of module", no qual devemos clicar e escolher o módulo "service-new-order". Por fim, clicaremos no botão "Run".

Portanto, vamos enviar dez mensagens de e-mail e dez mensagens de nova ordem. Na aba "NewOrderMain", aberta no terminal inferior, vamos ver as novas ordens chegando, checando se há fraude, aprovando ou recusando, de acordo com o valor.

Exemplos:

Processing new order, checking for fraud

414e7d24-a493-4024-9dbd-ce9fbf777c69

br.com.alura.ecommerce.Order@202b0582

2

20

Approved: br.com.atura.ecommerce.Order@202b0582

Processing new order, checking for fraud

ba298fd3-2d03-40e2-a857-2c52357178b6

br.com.alura.ecommerce.Order@1ca3b418

2

21

Order is a fraud!!!!!

Contudo, ele não está informando os dados do que foi aprovado nem do que foi uma fraude.

Quando imprimimos essa order, gostaríamos de substituir também o toString() e mostrar a order por completo. No caso do Java, podemos gerar o toString() facilmente. Cada linguagem, cada ferramenta vai ter uma maneira.

Acessando o arquivo Order, abaixo das chaves do getAmount(), basta usar o atalho "Ctrl+N" digitar toString() e selecionar essa opção na lista de sugestões da IDE, clicando posteriormente em "OK" na janela de geração.

Com isso, ele gera o toString() com os três valores para nós, logo abaixo do getAmount().

public class Order {

    // Código omitido
    
    public BigDecimal getAmount() {
        return amount;
    }
    
    @Override
    public String toString() {
        return "Order{" +
        "userId='" + userId + '\'' +
        ", orderId='" + orderId + '\'' +
        ", amount=" + amount +
        '}';
    }
}

Podemos parar a execução do nosso FraudDetectorService e rodar de novo, enviando dez mensagens novas. Com isso, vamos começar a ver todas as informações das orders enviadas no terminal. Entre elas, as ordems cujo amount passar de 4500, serão consideradas fraudes.

Além do nosso serviço poder receber mensagens e ter a vantagem de evoluir independentemente do outro serviço, queremos que ele também possa enviar mensagens. Se queremos isso, precisaremos do nosso KafkaDispatcher.

Enviando mensagens com KafkaDispacher

Vamos colocar um private final KafkaDispatcher dentro da classe FraudDetectorService, abaixo das chaves do public static void main(). O que esse Dispatcher vai despachar? Se tivermos um pedido aceito, poderemos despachar o pedido, ou seja, a nossa Order com letra maiúscula, que será adicionada entre os sinais de menor e maior.

Para criar isso, À direita da <Order> vamos colocar o orderDispatcher que é um new KafkaDispatcher, do tipo Order. Como já definimos o tipo em private final KafkaDispatcher<Order>, não precisamos defini-lo novamente entre os sinais de menor e maior.

public class FraudDetectorService {

    public static void main(String[] args) {
    
    // Código omitido
    
    }
    
    private final KafkaDispatcher<Order> orderDispatcher = new KafkaDispatcher<>();
    
    // Código omitido
}

Temos um Dispatcher agora. E o que podemos fazer com o orderDispatcher? Despachar mensagens.

Se foi, por exemplo, recusado, adicionaremos dentro das chaves do if e abaixo do println() um orderDispatcher.send() e o tópico "ECOMMERCE_ORDER_REJECTED" entre parênteses. Esse tópico indica que nossa order foi rejeitada, seja por causa da fraude ou de outro fator.

Estamos usando o ID da pessoa usuário como chave, portanto, adicionaremos order.getUserID() à direita do tópico. À direita deste, vamos enviar o objeto em si — ou seja, a própria order.

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        // Código omitido
        
        }
        var order = record.value();
        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
            System.out.println("Order is a fraud!!!!!");
            orderDispatcher.send("ECOMMERCE_ORDER_REJECTED", order.getUserId(), order);
        } else {
            System.out.println("Approved: " + order);
        }
        

    }
    // Código omitido
}

Faltou criar o getUserId(). Vamos acessar o arquivo do Order para criá-lo acima do toString.

public class Order {

    // Código omitido
        
    public String getUserId() {
        return userId;
    }
    
    @Override
    public String toString() {
        return "Order{" +
        "userId='" + userId + '\'' +
        ", orderId='" + orderId + '\'' +
        ", amount=" + amount +
        '}';
    }
}

Voltando ao arquivo FraudDetectorService, adicionaremos o orderDispatcher.send() entre as chaves do else, substituindo o REJECTED por APPROVED, para os casos nos quais a ordem foi aprovada.

public class FraudDetectorService {

    // Código omitido
    
    private void parse(ConsumerRecord<String, Order> record) {
        // Código omitido
        
        }
        var order = record.value();
        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
            System.out.println("Order is a fraud!!!!!");
            orderDispatcher.send("ECOMMERCE_ORDER_REJECTED", order.getUserId(), order);
        } else {
            System.out.println("Approved: " + order);
            orderDispatcher.send("ECOMMERCE_ORDER_APPROVED", order.getUserId(), order);
        }
        

    }
    // Código omitido
}

O código pedirá pelas exceções que podem ocorrer, portanto, precisamos lançá-las. Posicionando o cursor em cima do .send() e pressionando "Ctrl+Enter", selecionaremos a sugestão "Add exceptions to method signature" para adicionar as exceções na assinatura dessa função. Isso adicionará o throws ExecutionException, InterruptedException na função parse().

private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException {

// Código omitido

}

Nesse momento, ocorre um erro, porque a função parse lança uma exceção na linha fraudService::parse do método main(). Mas devemos lembrar que o KafkaService, recebe uma ConsumerFunction e esta função por sua vez, não pode lançar nenhuma exceção. Temos que ter cuidado com isso.

Teremos que adicionar as exceções, portanto, vamos adicionar o mínimo possível. Eu, Guilherme, sempre prefiro seguir o caminho do mínimo, adicionando conforme o necessário.

Posicionando o cursor em cima do fraudService::parse e, pressionando "Ctrl+Enter", selecionaremos a sugestão "Add exceptions to method signature" para adicionar as exceções nessa função. Acessando o arquivo ConsumerFunction, veremos o throws de ExecutionException e de InterruptedException na função void consume().

public interface ConsumerFunction<T> {
    void consume(ConsumerRecord<String, T> record) throws ExecutionException, InterruptedException;
}

Vamos saber lidar com isso. Mas como? No arquivo Kafka Service, quando chamamos o método consume() entre as chaves do void run(), queremos tratar esse erro. Se ocorreu esse erro, preciso tratar de alguma maneira tratar.

class KafkaService<T> implements Closeable {

    // Código omitido
    
    void run() {
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            if (!records.isEmpty()) {
                System.out.println("Encontrei " + records.count() + " registros");
                for (var record : records) {
                    parse.consume(record);
                }
            }
        }
    }
    
    // Código omitido
}

Tratamos esse erro lançando uma exceção e parando o serviço completamente por causa dela, ou tratamos a exceção somente para essa mensagem e continuamos executando as próximas. Faremos a segunda opção, clicando no consume(), pressionando "Ctrl+Enter" para abrir as sugestões e selecionando "Surround with try/catch" para implementar um try catch ao redor desse método.

class KafkaService<T> implements Closeable {

    // Código omitido
    
    void run() {
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            if (!records.isEmpty()) {
                System.out.println("Encontrei " + records.count() + " registros");
                for (var record : records) {
                    try {
                        parse.consume(record);
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    // Código omitido
}

O que podemos fazer no tratamento dessa exceção? Nós temos várias opções. Por enquanto, o tratamento será apenas um log. Portanto, adicionaremos a anotação abaixo entre as chaves dos dois catchs, acima dos prints. Ela significa "Até agora, apenas registrando a exceção para esta mensagem".

// so far, just logging the exception for this message
class KafkaService<T> implements Closeable {

    // Código omitido
    
    void run() {
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            if (!records.isEmpty()) {
                System.out.println("Encontrei " + records.count() + " registros");
                for (var record : records) {
                    try {
                        parse.consume(record);
                    } catch (ExecutionException e) {
                        // so far, just logging the exception for this message
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        // so far, just logging the exception for this message
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    // Código omitido
}

Poderíamos armazenar as mensagens que deram errado em algum lugar para saber quais delas estão dando errado. Poderíamos também não "comitar" essa mensagem e deixar tentar de novo, enquanto estiver dando exceção. Portanto, há várias maneiras de lidar com esse erro, nas quais vamos focar quando falarmos apenas sobre tratamento de erro.

Por enquanto, queremos criar um serviço que seja capaz de receber e também enviar mensagens. E é isso que estamos fazendo.

Se estamos fazendo isso falta rodar o log, para ver essas mensagens indo e voltando. Portanto, vamos reiniciar o FraudDetectorService, abrir o arquivo LogService dentro da pasta "service-log" e vamos rodá-lo, para ver todas as mensagens.

Ao executá-lo, teremos a janela que aponta novamente um problema no módulo. No campo "Use classpath of module", vamos escolher o módulo certo, que é "service-log" e clicar no botão "Run".

Em seguida, teremos o LogService rodando em uma aba do terminal inferior. Ele recolherá um monte de logs que já tinha no passado. Com o FraudDetectorService também rodando, enviaremos as 10 mensagens.

Lembrando que enviaremos 10 mensagens de e-mail e 10 mensagens do pedido de compra, que chegam na verificação de fraude. Essas 10 do pedido de compra vão virar 10 novas mensagens, de aprovação ou de rejeição. Vamos ver as 30 mensagens sendo enviadas, de um lado para o outro?

Vamos rodar o NewOrderMain. Acessando a aba do LogService no terminal, veremos que todas as mensagens chegam no log.

LOG: ECOMMERCE_SEND_EMAIL

3daee167-e77b-43db-baf2-793c7c2363dc

"Thank you for your order! We are processing your order!"

LOG: ECOMMERCE_NEW_ORDER

fd5e0bcd-725e-4f1c-90f6-99b6d243a6c4

{"userId":"fd5e0bcd-725e-4f1c-90f6-99b6d243a6c4","orderId":"a5e4edb5-b591-46e4-bc5f-62ee7a0e39a4" …} …

Em seguida, o FraudDetector começará a rodar. Se verificarmos a aba dele no terminal, veremos que ele dirá "enviei", com a mensagem "Order approved" e a informação da ordem.

Processing new order, checking for fraud

9035ad75-dd08-4306-9a6f-5432e8c84b99

Order{userId='9035ad75-dd08-4306-9a6f-5432e8c84b99', orderId='713fc926-4c3a-4b42-8c65-a7dea33940b2 …} …

Já o LogService não está recebendo. Por que ele não está recebendo? Há um cuidado muito importante a se tomar quando estamos trabalhando com patterns (padrões), como no LogService. Quais são os subjects (tópicos) que ele está ouvindo?

Isso não é dinâmico. Não é como se ele fosse ouvir um novo tópico surgindo. Na verdade, ele só começa a ouvir os tópicos na hora que rodamos ele. Ou seja, os tópicos que temos e que servem esse padrão, são aqueles que ele vai ouvir. Se surgir um novo que segue esse padrão, ele não vai ouvir.

E surgiu, certo? Foram criados e enviados dois novos subjects. Ou seja, anteriormente, o LogService não estava escutando.

Agora que esses tópicos existem, podemos executar novamente o LogService e o NewOrderMain. O LogService irá capturar todas as mensagens dos tópicos que já existem, ou seja, do e-commerce de e-mail e dos e-commerces aprovado e rejeitado. Claro que, no caso do rejeitado, teremos que aguardar mais tempo, pois ele acontece com menos frequência.

Resumindo, nós acabamos de criar um consumidor que também é produtor.

Novos produtores e consumidores - Um serviço que acessa bancos externos

O próximo passo é criar um novo serviço. O ponto que queremos destacar agora é a questão da dependência dos projetos, ou melhor, a independência entre eles.

Há uma certa dependência relacionada à estrutura do esquema das mensagens que são enviadas, como o JSON que é transmitido de um lado para o outro. No entanto, nossas classes e nossas dependências internas a um serviço são independentes.

Vamos dar uma olhada em um novo serviço que vai utilizar algo a mais: um banco de dados. Esse será outro serviço externo. O que planejamos criar agora é um novo serviço que, ao receber uma mensagem de uma nova solicitação de compra, verificará se a pessoa é nova, ou seja, se possui um e-mail novo. Se sim, nós vamos inserir essa pessoa no banco de dados.

Implementando um serviço que acessa dados externos

A maneira de fazer isso é criando um serviço que represente o banco de usuários. Este seria o local onde teríamos as informações pessoais das pessoas usuárias, e não gostaríamos que todos os serviços tivessem acesso a isso, para evitar manipulações indesejadas de campos.

Acessando o explorador lateral, vamos criar um novo módulo, da forma que já conhecemos. Na janela de criação, no campo "ArtifactId", vamos nomeá-lo como "service-users", com hífen, pois será onde teremos as pessoas usuárias. Após clicar em "Next", preencheremos o campo "Module name:" com "serviceusers", sem hífen, e em "Finish".

Com isso, teremos criado o nosso projeto "service-users", que abrirá a aba do arquivo pom.xml com a estrutura abaixo pronta.

<?xml version ="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ecommerce</artifactId>
        <groupId>br.com.alura</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    
    <artifactId>service-users</artifactId>
    
</project>

Se vamos utilizar o banco de dados, precisamos de um. O banco que vamos utilizar se chama SQLite. Para encontrá-lo, vamos acessar o site do MVN Repository, na página de download do SQLite JDBC.

Queremos a versão do SQLite para o JDBC, para o Java. Neste vídeo, usaremos a versão 3.28.0. Após clicar nela, vamos copiar o trecho de código exibido na aba "Maven".

<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
    <groupId>org.xerial</groupId>
    <artifactId>sqlite-jdbc</artifactId>
    <version>3.28.0</version>
</dependency>

Voltando ao código, no arquivo pom.xml do "service-users", vamos abrir e fechar o bloco <dependencies>, abaixo do artifactId, e adicionar a dependência em seu interior.

<?xml version ="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ecommerce</artifactId>
        <groupId>br.com.alura</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    
    <artifactId>service-users</artifactId>
    
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
    <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.28.0</version>
    </dependency>
    </dependencies>
    
</project>

Ao salvar, precisamos que ele recarregue esse arquivo, para que ele já seja baixado. Como no vídeo já temos o auto-reload, ótimo. Caso contrário, podemos acessar a aba do Maven, na lateral direita, escolher a pasta do projeto "service-users" e selecionar o botão "Reimport All Maven Projects", no canto superior esquerdo.

Após a dependência ser baixada, podemos criar uma classe java na pasta "java" que há no interior do nosso service-users, percorrendo o caminho "service-users > src > main > java". A classe se chamará br.com.alura.ecommerce.CreateUserService, isto é, um serviço que cria pessoas usuárias. Após pressionar "Enter" na janela de criação, o arquivo CreateUserService será exibido em uma nova aba.

public class CreateUserService {

}

Assim como o FraudDetectorService, ele escutará a mensagem de nova ordem de compra. Portanto, vamos copiar todo o conteúdo entre as chaves dessa classe e colar na classe atual. Na janela "Select Classes to Import" (selecionar classes para importar), devemos selecionar todas as classes.

public class CreateUserService {

    public static void main(String[] args) {
        var fraudService = new FraudDetectorService();
        try (var service = new KafkaService<>(FraudDetectorService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }
    
    private final KafkaDispatcher<Order> orderDispatcher = new KafkaDispatcher<>();

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for fraud");
        System.out.println(record.key());
        System.out.println(record.value());
        System.out.println(record.partition());
        System.out.println(record.offset());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // ignoring
            e.printStackTrace();
        }
        var order = record.value();
        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
            System.out.println("Order is a fraud!!!!!");
            orderDispatcher.send("ECOMMERCE_ORDER_REJECTED", order.getUserId(), order);
        } else {
            System.out.println("Approved: " + order);
            orderDispatcher.send("ECOMMERCE_ORDER_APPROVED", order.getUserId(), order);
        }
        

    }

    private boolean isFraud (Order order) {
        return order.getAmount().compareTo(new BigDecimal( val: "4500")) >= 0;
    }

}

Mas vamos observar que, no método main() e realizar modificações. Na primeira linha, queremos criar um CreateUserService e não um FraudDetectorService. Já na segunda linha, entre os parênteses de new KafkaService(), a classe do nosso serviço é o CreateUserService, portanto, vamos adicioná-la no lugar de FraudDetectorService.

public class CreateUserService {

    public static void main(String[] args) {
        var fraudService = new CreateUserService();
        try (var service = new KafkaService<>(CreateUserService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }
    
    // Código omitido
}

Voltando à segunda linha do método main(), queremos usar o KafkaService, mas temos que adicionar a dependência do common-kafka. Para importar, faremos "Ctrl+Enter" no KafkaService em vermelho e selecionaremos "Add dependency on module 'common-kafka'" na lista de sugestões.

Queremos usar o common-kafka no arquivo pom.xml de "service-users". Acessando-o, podemos ver que o common-kafka foi adicionado na lista de dependências, só que queremos rodar sem o compile. Na hora de executar, queremos executar para valer, portanto, removeremos a linha <scope>compile</scope> da dependência common-kafka.

<?xml version ="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ecommerce</artifactId>
        <groupId>br.com.alura</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    
    <artifactId>service-users</artifactId>
    
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
    <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.28.0</version>
    </dependency>
    <dependency>
        <groupId>br.com.alura</groupId>
        <artifactId>common-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    </dependencies>
    
</project>

Voltando ao arquivo CreateUserService temos o CreateUserService, que é o nosso Consumer. Ele vai consumir uma ordem, portanto, não alteraremos a linha Order.class dentro do método main(). Contudo, a classe Order é exibida em vermelho e precisa ser importada, pois precisamos dela. Vamos copiar qual order? A de envio, porque ela é a mais simples de todas. Se precisarmos de mais detalhes, podemos adicionar posteriormente.

Para copiá-la, acessaremos o explorador lateral, clicaremos na classe Order dentro da pasta "service-new-order", copiaremos com "Ctrl+C", clicaremos na pasta "br.com.alura.ecommerce" dentro da pasta "service-users" e colaremos com "Ctrl+V". Na janela "Copy class" exibida, pressionaremos o botão "OK".

Isso gerará um novo arquivo Order dentro da pasta "service-users". Ele será exibido automaticamente em uma nova aba, a qual podemos fechar.

Resumindo, não precisamos criar dependências para o Order. Queremos somente ter nossa própria cópia de um pedido, com campos e o que mais quisermos.

Voltando ao arquivo CreateUserService, temos um KafkaDispatcher abaixo do método main(). Não precisamos despachar nada, somente criar no banco, portanto, vamos eliminar essa linha.

Abaixo dela, temos o método parse(). Em seu interior, quando recebemos uma nova mensagem de uma nova order, queremos verificar se é uma pessoa usuária nova. Portanto, entre os parênteses do segundo println, vamos substituir a mensagem "Processing new order, checking for fraud" por "Processing new order, checking for new user".

Vamos imprimir o valor e mais nada, portanto, vamos remover os printlns da key, da partition, e do offset. Só o valor para nós é suficiente.

Também não precisamos de um Thread.sleep(), podemos executar rapidamente e ponto final. Portanto, vamos remover todo o bloco try catch desse método.

Por fim, removeremos todo o bloco if e else que realiza uma ação, e também deletaremos o método isFraud(). O código completo do arquivo CreateUserService após as alterações pode ser visto abaixo.

public class CreateUserService {

    public static void main(String[] args) {
        var fraudService = new CreateUserService();
        try (var service = new KafkaService<>(CreateUserService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for new user");
        System.out.println(record.value());
        System.out.println(record.partition());
        var order = record.value();     
    }

}

Nós queremos fazer algo com essa order. Com o nosso código do CreateUserService pronto, só precisamos acessar o banco e fazer algo.

Agora é a hora em que usamos o acesso ao banco da nossa biblioteca ou linguagem. Queremos criar um serviço que utiliza um banco de dados. Poderia ser outro serviço externo, como enviar e-mails, enviar notificação push, salvar arquivos em disco, etc. No nosso caso, queremos um banco de dados.

Criando a conexão com o banco

Vamos criar uma conexão com o banco, assumindo que esse serviço só roda uma única vez e implementaremos de uma maneira que vamos rodá-lo uma única vez. Isso é importante.

Com outros serviços daria para rodar quantas vezes quisermos. No nosso caso, com banco de dados, se estivéssemos rodando com o MySQL, o Postgre, ou outro, de maneira remota, poderíamos rodar esse serviço quantas vezes quisermos, mantendo vários processos paralelos. No nosso caso, vamos rodar um SQLite, que vai salvar um arquivo em disco e terá apenas uma instância rodando.

Na primeira linha entre as chaves da classe CreateUserService, criaremos um construtor CreateUserService. Entre as chaves deste, por sua vez, queremos abrir a conexão com o banco. Para isso, criaremos uma String url que receberá uma URL de conexão com o banco, jdbc:sqlite:users_database.db. Isso criará o arquivo users_database.db.

Abaixo dessa linha, vamos criar a conexão com this.connection igual a DriverManager.getConnection(), e a url entre parênteses.

Posicionando o cursor no connection, pressionaremos "Ctrl+Enter" e selecionaremos "'Create field connection' in 'CreateUserService'" para criar a variável private final Connection connection acima do construtor.

public class CreateUserService {

    private final Connection connection;
    
    CreateUserService() {
        String url = "jdbc:sqlite:users_database.db";
        this.connection = DriverManager.getConnection(url);
    }

    // Código omitido
}

Lembrando que poderíamos usar o hibernate ou outra ferramenta de outra linguagem para gravar no banco. Isso não importa. Estamos usando o JDBC puro, porque queremos ser diretos. Afinal, não estamos focados em coisas avançadas de banco de dados, não é a nossa questão aqui. Nossa questão é de serviços consumidores, serviços que acessam serviços externos, etc.

O getConnection() pode lançar exceção. Vamos lançá-la sem nos preocupar com isso, posicionando o cursor no getConnection, pressionando "Ctrl+Enter" e selecionando "'Add exception to method signature" para gerar o comando throws SQLException no construtor.

public class CreateUserService {

    private final Connection connection;
    
    CreateUserService() throws SQLException {
        String url = "jdbc:sqlite:users_database.db";
        this.connection = DriverManager.getConnection(url);
    }

    // Código omitido
}

Queremos criar uma tabela. Para criar isso, adicionaremos um connection.createStatement() abaixo do getConnection(url) para criar um statement da maneira mais simples, pois não teremos concatenação de strings.

À direita do statement, executamos ele com ponto execute(). Nesse statement, ou seja, entre os parênteses do execute(), teremos os seguintes elementos:

public class CreateUserService {

    private final Connection connection;
    
    CreateUserService() throws SQLException {
        String url = "jdbc:sqlite:users_database.db";
        this.connection = DriverManager.getConnection(url);
        connection.createStatement().execute("create table Users (" + 
            "uuid varchar(200) primary key,"
            "email varchar(200))");
    }

    // Código omitido
}

Esse é o createStatement() que gostaríamos de executar. Temos que ter cuidado com ele, pois se a tabela já existe, queremos ignorar. Na primeira vez ele vai criar, e na segunda vez temos que ter cuidado.

Vamos deixar da maneira que está. Na segunda vez que rodarmos, veremos o problema acontecer.

Acessando novamente o método main(), vamos lançar a exceção que ela possui, utilizando o mesmo método de antes, com a opção "Add exception to method signature".

public class CreateUserService {

    private final Connection connection;
    
    CreateUserService() throws SQLException {
        String url = "jdbc:sqlite:users_database.db";
        this.connection = DriverManager.getConnection(url);
        connection.createStatement().execute("create table Users (" + 
            "uuid varchar(200) primary key,"
            "email varchar(200))");
    }

    public static void main(String[] args) {
        var fraudService = new CreateUserService();
        try (var service = new KafkaService<>(CreateUserService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }
    
    // Código omitido
}

Se rodarmos esse serviço, ele deverá fazer duas coisas: ficar escutando e criar a tabela. Vamos rodar, pressionando "Ctrl+Shift+R" ou "Cmd+Shift+R", dependendo do sistema operacional.

Ele rodará e abrirá a aba do terminal, na parte inferior. Quando clicamos no botao de lista suspensa "CreateUserService", no canto superior direito da IDE, e clicamos na opção "Edit configurations" (editar configurações), abrimos a janela de configurações de execução.

Em seu interior, verificaremos o campo "Working directory" no qual constataremos que o CreateUserService, ao ser executado, roda no diretório "projeto-atual-ecommerce".

Se voltarmos ao explorador lateral esquerdo, buscar esse diretório, clicar com o botão direito nele e selecionar "Synchronize 'ecommerce'", veremos o arquivo users_database.db, logo abaixo do pom.xml que pertence ao diretório "service-users".

Após constatar que ele está funcionando, vamos deletar esse arquivo gerado. Voltando ao construtor do CreateUserService, vamos modificar sua url para "jdbc:sqlite:target/users_database.db". Dessa forma, a tabela será salva dentro da pasta "target", evitando commits no repositório.

public class CreateUserService {

    private final Connection connection;
    
    CreateUserService() throws SQLException {
        String url = "jdbc:sqlite:target/users_database.db";
        this.connection = DriverManager.getConnection(url);
        connection.createStatement().execute("create table Users (" + 
            "uuid varchar(200) primary key,"
            "email varchar(200))");
    }
    
    // Código omitido
}

Vamos pressionar "Ctrl+Shift+R" para rodar o CreateUserService de novo. Em seguida, vamos pressionar "Synchronize 'ecommerce'" novamente no diretório "projeto-atual-ecommerce".

Com isso, se observarmos o interior da pasta "target", veremos o users_database.db lá dentro. Então, criamos a nossa tabela, porque não deu erro.

O que queremos fazer agora? Quando recebemos uma nova mensagem de nova order, queremos verificar se já existe essa pessoa usuária com esse e-mail no sistema. No interior do CreateUserService, entre as chaves do parse() e abaixo da var order, adicionaremos um if(isNewUser(order.getEmail())) e um par de chaves, para verificar se essa pessoa usuária com esse e-mail existe.

public class CreateUserService {

    // Código omitido

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for new user");
        System.out.println(record.value());
        System.out.println(record.partition());
        var order = record.value();
        if(isNewUser(order.getEmail())) {
        
        }
    }

}

Se já existe uma pessoa usuária com esse e-mail, não vamos fazer nada. Mas queremos fazer algo se for uma pessoa usuária nova.

O getEmail() não existe, portanto, vamos pressionar "Ctrl+Enter" nele e selecionar "Create method 'getEmail' in 'Order'" para criá-lo no arquivo Order. Seremos direcionados automaticamente para ele, onde teremos a estrutura do getEmail() pronta, abaixo do Order().

public class Order {

    // Código omitido

    public Order(String userId, String orderId, BigDecimal amount) {
        this.userId = userId;
        this.orderId = orderId;
        this.amount = amount;
    }
        
    public Object getEmail() {
    
    }
    
    // Código omitido
}

Ele vai devolver uma string, portanto, substituiremos o tipo Object por String.

Por enquanto, não estamos devolvendo nada, pois não temos o e-mail. Entre as chaves, vamos devolver qualquer coisa, portanto, escreveremos return "email", já que este será o retorno do processo de compra.

public class Order {

    // Código omitido

    public Order(String userId, String orderId, BigDecimal amount) {
        this.userId = userId;
        this.orderId = orderId;
        this.amount = amount;
    }
        
    public String getEmail() {
        return "email";
    }
    
    // Código omitido
}

Voltaremos ao arquivo CreateUserService. Na função parse(), temos o if(isNewUser()) portanto, vamos criar o método isNewUser() selecionando a sugestão "Create method 'isNewUser' in 'CreateUserService'". Isso gerará essa função abaixo do parse().

Entre suas chaves, por padrão, vamos retornar que é um novo usuário, ou seja, return true.

public class CreateUserService {

    // Código omitido
    
    private boolean isNewUser(String email) {
        return true
    }

}

Se é um novo usuário, queremos inserir. Então, entre as chaves do if(isNewUser() do método parse(), vamos adicionar insertNewUser(order.getEmail()). Estamos chamando o getter duas vezes para poder extrair.

Por fim, vamos criar o método insertNewUser() selecionando a sugestão "Create method 'isNewUser' in 'CreateUserService'". Isso gerará essa função abaixo do parse() e acima do isNewUser().

public class CreateUserService {

    // Código omitido

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for new user");
        System.out.println(record.value());
        System.out.println(record.partition());
        var order = record.value();
        if(isNewUser(order.getEmail())) {
            insertNewUser(order.getEmail());
        }
    }
    
    private void insertNewUser(String email) {
    
    }
    
    private boolean isNewUser(String email) {
    return true
    }
    
}

Vamos implementar as duas funções, insertNewUser() e isNewUser(). Na primeira, faremos um insert. Entre suas chaves, o connection prepara um statement para nós com prepareStatement(), que será o "insert into Users ()". Entre os parênteses, vamos adicionar os campos que tem no Users: o uuid e o email da pessoa usuária.

Faltam os valores. Portanto, adicionaremos um sinal de mais e o values (?, ?) entre aspas duplas.

Este será o nosso statement que preparamos. Temos que lançar uma exceção, porque pode ocorrer erro. Para isso, vamos pressionar "Ctrl+Enter" em prepareStatement e selecionar "Add exception to method signature", o que gerará um throws SQLException.

A linha connection.prepareStatement() nos devolve um statement de insert, portanto, preferimos chamá-lo de insert.

Abaixo dessa variável, pediremos que ela defina uma string com o comando insert.setString(). Entre seus parênteses, adicionamos o número 1 e o primeiro parâmetro, o uuid da pessoa usuária, entre aspas duplas. Desceremos uma linha e criaremos um segundo insert.setString()com o número 2 e o email.

Na próxima linha, pedimos ao insert que execute com insert.execute(). Após a execução, a pessoa usuária foi adicionada. Na próxima linha, faremos um System.out.prinlt("Usuário uuid e " + email + " adicionado"). Podemos colocar o UUID dessa forma e utilizar uma interpolação simples.

public class CreateUserService {

    // Código omitido
    
    private void insertNewUser(String email) throws SQLException {
        var insert = connection.prepareStatement(sql: "insert into Users (uuid, email) + values (?,?)");
        insert.setString(1, "uuid");
        insert.setString(2, email);
        insert.execute();
        System.out.println("Usuário uuid e " + email + " adicionado");
    }
    
    // Código omitido
    
}

Esse código seria para inserir. Falta o UUID.

Voltando ao parse(), vamos lançar a exceção do insertNewUser(), clicando nele e selecionando "Add exception to method signature". Isso gerará o SQLException.

public class CreateUserService {

  // Código omitido

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException, SQLException {
        // Código omitido
    }

Quando lançamos exceções, temos que ter cuidado. Não estamos precisando das exceções ExecutionExceptione InterruptedException, portanto, vamos removê-las.

public class CreateUserService {

  // Código omitido

    private void parse(ConsumerRecord<String, Order> record) throws SQLException {
        // Código omitido
    }

Quando lançamos a SQLException , passamos a ter um erro na fraudService::parse dentro do método main().

Voltando a esse método, veremos que esquecemos de substituir o nome da variável fraudService para createUserService. Vamos fazer isso nas duas linhas desse método em que elas aparecem.

public class CreateUserService {

    // Código omitido

    public static void main(String[] args) throws SQLException{
        var createUserService = new CreateUserService();
        try (var service = new KafkaService<>(CreateUserService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                createUserService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }
    
    // Código omitido
}

O nosso KafkaService recebe um ConsumerFunction, que não lança SQLException.

Na prática, é raro colocarmos simplesmente throws Exception. São só em momentos em que realmente queremos tratar qualquer tipo de exceção. Isso vale para qualquer linguagem.

Mas esse é um momento em que queremos isso. Queremos ser capazes de nos recuperar e ir para outra mensagem quando recebermos uma mensagem no nosso KafkaService.

Vamos voltar ao KafkaService.java e, dentro do void run(), substituiremos o ExecutionException e no primeiro catch para Exception e. Também vamos remover totalmente o segundo catch.

Na primeira linha do catch que restou, vamos adicionar o comentário // only catches Exception because no matter which Exception I want to recover and parse the next one, dividido em duas linhas.

class KafkaService<T> implements Closeable {

    // Código omitido
    
    void run() {
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            if (!records.isEmpty()) {
                System.out.println("Encontrei " + records.count() + " registros");
                for (var record : records) {
                    try {
                        parse.consume(record);
                    } catch (ExecutionException e) {
                        // only catches Exception because no matter which Exception
                        // I want to recover and parse the next one
                        // so far, just logging the exception for this message
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    // Código omitido
}

É isso que queremos fazer. Não importa o que aconteça, queremos pegar a próxima. Por enquanto, deixaremos dessa maneira.

Então, trouxemos SQLException e não tem problema. Contudo, temos um problema no arquivo CreateUserService. Faltou implementar o isNewUser() no final do código para verificar se é um usuário novo.

Entre as chaves dele, acima do return true, também queremos pegar a connection, preparar com ,prepareStatement() que é "select". À direita deste, vamos buscar uuid from Users, dentro das mesmas aspas. À direita das aspas, vamos concatenar um "where email" igual a alguma coisa — só estamos interessados em trazer um, portanto, será limit 1.

Esta linha é nosso query se existir, portanto, criaremos um var exists que a receberá.

public class CreateUserService {

    // Código omitido
    
    private boolean isNewUser(String email) {
        var exists = connection.prepareStatement("select uuid from Users " +
        "where email = ? limit 1");
        return true;
    }
    
}

Na linha seguinte, definiremos uma string no exists com .setString(). O primeiro é o email que estamos procurando, portanto, adicionaremos o 1 e o email entre os parênteses.

Na linha seguinte, faremos o exists executar a query com .executeQuery(). Só que teremos que lançar uma exceção do tipo SQLException, clicando no exists.setString e selecionando "Add exception to method signature" para gerar o comando throws SQLException na linha de declaração do isNewUser().

Isso nos devolve os resultados, portanto, adicionaremos um var results para receber o exists.executeQuery().

Existe se tem próxima linha. Se tem próxima linha, é porque existe. Por isso, na linha seguinte, adicionaremos um return !results.next(). Se o next(), que quer dizer "vá para a próxima linha", for bem sucedido, é porque existe. Então, não é um usuário novo.

Por fim, podemos retirar o return true.

public class CreateUserService {

    // Código omitido
    
    private boolean isNewUser(String email) throws SQLException {
        var exists = connection.prepareStatement("select uuid from Users " +
        "where email = ? limit 1");
        exists.setString(1, email); 
        var results = exists.executeQuery();
        return !results.next();
    }
    
}

Assim, estamos verificando se o usuário é novo. E assim, estamos inserindo a nossa pessoa usuária de acordo com a nossa order no banco.

Vamos parar para pensar: isso quer dizer que, quando temos uma nova compra, enviamos a mensagem com a compra. Quer dizer que a pessoa preencheu seus dados no site ou aplicativo, com o seu e-mail, os dados da sua compra, e enviou. Nesse caso, ela gera o UUID?

Às vezes, não. Ela já tem um identificador único, que é o e-mail. Já o identificador único do tipo UUID não acontece na hora da compra, quando a pessoa preenche o formulário. Mas existem situações onde a gente quer fazer isso também. Não tem problema.

Contudo, no nosso caso, quando realizamos uma compra, não temos o UUID, somente o e-mail. Com isso, temos um problema arquitetural que exige a mudança de todo o nosso esquema de "comitação", de um lado para o outro, porque a order do new Order() não possui userId, orderId e amount, mas sim email, orderId e amount.

Isso irá para o CreateUserService. Quando este cria ou busca uma pessoa usuária no banco, sabemos seu ID e se ele existe ou não. Ou seja, esse ID só existirá depois do CreateUserService, se ela for uma pessoa usuária nova.

Na prática, temos que cuidado com isso. Só faz sentido rodar o sistema de fraude e outras coisas depois de inserir a pessoa usuária no banco. Isso é uma decisão que temos que tomar: faz sentido rodar depois? Rodar antes? Rodar com as informações da mensagem?

De acordo com a decisão que tomarmos no nosso sistema, teremos um ou outro caminho seguido pelas mensagens.

A seguir, vamos adaptar nossos esquemas para isso. Por enquanto, temos um serviço capaz de armazenar e buscar dados de um banco de qualquer biblioteca.

Sobre o curso Kafka: Fast delegate, evolução e cluster de brokers

O curso Kafka: Fast delegate, evolução e cluster de brokers possui 139 minutos de vídeos, em um total de 24 atividades. Gostou? Conheça nossos outros cursos de Mensageria/Streams em DevOps, ou leia nossos artigos de DevOps.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Mensageria/Streams acessando integralmente esse e outros cursos, comece hoje!

Conheça os Planos para Empresas