Alura > Cursos de Data Science > Cursos de Engenharia de Dados > Conteúdos de Engenharia de Dados > Primeiras aulas do curso Apache Beam: Data Pipeline com Python

Apache Beam: Data Pipeline com Python

Data Pipeline - Apresentação

Boas-vindas! Meu nome é Pedro Felipe e acompanharei você no curso sobre Apache Beam.

Esse curso é destinado tanto para quem quer ingressar quanto para quem já ingressou no mercado de engenharia ou de ciência de dados e quer melhorar a sua pipeline, sua esteira ou sua gestão de dados dentro da sua ferramenta já utilizada. Além de aprender a utilizar uma nova ferramenta no seu portfólio otimizando os seus processos do dia a dia.

O pré-requisito deste curso é apenas saber o básico de Python, para que, a partir dele, nós consigamos utilizar o SDK do Apache Beam para aplicar todas as transformações necessárias para concluir o nosso processo, que terá como base a análise da quantidade de chuvas e casos de dengue em estados brasileiros.

Nós vamos realizar a ingestão de dados utilizando a SDK do Apache Beam, vamos trazer o Apache Beam para dentro aqui do nosso projeto, o nosso código Python, e trazer também uma série de classes, métodos, módulos necessários para realizar todos os processos que vamos criar, desde a ingestão de dados até a persistência.

Começando pela ingestão de dados, utilizaremos o ReadFromText, onde vamos fazer a leitura, trazer para dentro do processo tanto o dado bruto dos casos de dengue, que é esse dataset aqui, nós temos várias informações sobre a quantidade de casos de dengue por cidades, vamos fazer vários tratamentos para trabalhar esse dado bruto.

E também os dados de quantidade de chuva ou precipitação meteorológica, e nós temos de várias estações meteorológicas também nesse arquivo bruto, nesse “sample_chuvas.csv”.

Após realizar a leitura dos arquivos, trazer essa informação para dentro do nosso processo, nós vamos aplicar também várias transformações e essas transformações vão refletir as regras de negócio que vamos definir durante a nossa fase de análise.

Nós vamos utilizar o método Map para realizar algumas transformações, iremos utilizar também o FlatMap para realizar, também, transformações, agrupamentos pela chave, também combinação por chave passando um método de soma. Vamos aplicar aqui diversas transformações para ir trabalhando o dado passo a passo dentro das pipelines.

E por fim, vamos persistir esse dado em algum formato. No nosso caso, nós vamos persistir em um formato de arquivo de texto, que é o CSV, para que possamos depois utilizar esse arquivo para fazer análise.

Então nós vamos utilizar outro método aqui do Apache Beam, que é o WriteToText, para consolidar todos esses tratamentos em um arquivo do tipo CSV e esse arquivo vai ficar com essa cara.

Cada coluna aqui é um dado específico, como, por exemplo, estado, ano, mês, chuva e dengue, trazendo todos os resultados segundo as transformações dos processos que aplicamos.

E após isso vamos pegar esse dataset aqui que criamos, todo transformado e trabalhado conforme as regras de negócio no processo de engenharia de dados, e vamos trazê-lo para um ambiente bem conhecido do cientista de dados, que é o Jupyter Notebook, utilizando aqui a biblioteca, no caso, o Pandas, vamos trazê-lo para dentro do nosso processo em um dataframe, lendo aquele CSV.

E vamos aplicar aqui alguns métodos do Pandas para trazer e analisar, tirar correlações dos dados que processamos. Temos bastante conteúdo legal para estudar, vamos começar o nosso curso?

Data Pipeline - Data Pipeline

A Alura Analytics, uma empresa de análise de dados, solicitou ao time de engenharia de dados o desenvolvimento de uma base que consolidasse informações de casos de dengue e de precipitação meteorológica, ou seja, de quantidade de chuvas.

Esses dados servirão para que o time de ciência de dados da empresa possam, a partir da base de informações que vamos criar, fazer análises, tirar insights e correlações entre a quantidade de chuvas e os casos de dengue.

Iniciaremos criando e configurando o nosso ambiente de desenvolvimento. Estou utilizando o sistema operacional Windows, porém utilizo o WSL.

O WSL, que é um subsistema Windows para Linux, permite que nós, desenvolvedores, possamos ter esse terminal onde podemos digitar comandos do Linux e isso, pelo menos para mim, como desenvolvedor, facilita bastante na hora do desenvolvimento. Mas você pode utilizar o seu sistema operacional Windows ou diretamente o Linux, que você gosta mais de utilizar.

No terminal, aqui no WSL, nós temos uma versão da distribuição Linux, para visualizar isso eu vou digitar o comando uname, ele vai me retornar aqui que eu estou utilizando o sistema operacional Linux. Se eu digitar uname -a, ele vai me retornar toda a informação da minha distribuição.

Eu posso também verificar qual a distribuição que estou utilizando com o comando lsb_release -a, e ele vai me retornar uma distribuição Ubuntu 16.04.7. Vou limpar aqui meu terminal.

Como nós vamos utilizar no decorrer do nosso curso a linguagem de programação Python, vou verificar aqui também a minha instalação do Python na minha máquina virtual ou aqui no meu WSL. Digitando aqui python --version, ele me retornou o Python 3.7.

E também utilizaremos o pip para gerenciar essas nossas bibliotecas, as nossas dependências que utilizaremos no nosso projeto. Também vou verificar a instalação dele, pip --version, retornou a versão do meu pip e também onde ele está referenciando é o meu Python 3.7. Vou limpar novamente o meu terminal.

Vou criar agora um diretório ou uma pasta, onde vamos colocar todos os códigos dos nossos projetos. Vou digitar aqui mkdir, vou chamá-lo alura-curso-beam. Vou para esse diretório que eu acabei de criar, alura-curso-beam e agora eu estou aqui no diretório onde vamos colocar os nossos códigos.

Como estamos utilizando a linguagem Python e o pip para desenvolver o nosso projeto aqui e as dependências, no caso, o pip, vou utilizar também um ambiente virtual. Esse ambiente virtual vai facilitar o nosso desenvolvimento, porque todas as versões que nós instalarmos agora das bibliotecas irão funcionar na mesma versão se você estiver vendo esse curso lá no futuro. Então eu vou utilizar uma virtual ENV.

Para criar uma virtual ENV eu digito python -m venv e o nome do meu ambiente virtual que eu vou criar. Eu vou criá-lo chamado env-beam, aperto “Enter” e ele vai criar aqui o meu ambiente virtual.

Agora que criamos o nosso ambiente virtual, vejamos, digitando o comando ls, que ele criou outra pasta aqui dentro do meu diretório do curso, “env-beam”.

Para eu ativar o meu ambiente virtual aqui no meu terminal eu preciso direcionar um arquivo que está nessa pasta que vá ativar esse ambiente e eu posso começar a instalar minhas dependências, e essas minhas dependências que estão nesse ambiente virtual, estão totalmente separadas do restante do meu sistema operacional.

Eu vou digitar o comando source e direcionar para o arquivo que vai ativar esse meu ambiente virtual, ele está lá em env-beam/bin/activate. Quando eu aperto “Enter” aparece agora aqui no canto do meu terminal, à esquerda, o nome do ambiente que eu criei, “(env-beam)”.

Se eu digitar o comando pip list ele vai mostrar todas as dependências que já vieram com essa minha versão. Eu tenho aqui o pip e tenho aqui o setuptools, o que vamos fazer agora é instalar nossa primeira dependência, que é o Apache Beam.

Então pip install apache-beam, digitei “Enter”. Digitei errado o nome install, vamos corrigir. E agora ele vai fazer a instalação do Apache Beam dentro desse meu ambiente virtual e ele vai estar como dependência do nosso projeto. Vou aguardar a instalação e já retorno.

Finalizou a instalação do meu Apache Beam, esse warning aqui é só porque ele quer que eu dê um upgrade na minha versão do pip, eu vou fazer depois.

Mas só para verificar se a instalação ocorreu corretamente, vou digitar aqui um pip list e agora vejam que ele me retornou uma série de dependências, porque eu pedi para instalar o Apache Beam, que ele instalou na versão 2.27, porém para a instalação do Apache Beam ocorrer, ele precisava de outras dependências, por isso todas essas dependências aqui foram instaladas, para suprir a necessidade do Apache Beam.

Só para testar mesmo a instalação do Apache Beam, vou limpar o meu terminal e vou acionar o Python, vou abrir o Python diretamente do meu terminal. Se eu digitar python ele abriu aqui o meu terminal interativo, se eu digitar print(‘Apache beam’) ele vai printar o Apache Beam, é um terminal como outro qualquer, e eu vou fazer só a importação para verificar se importou da maneira correta, import apache_beam.

A importação ocorreu, está tudo correto. Se eu fizer um dir, que vai retornar (apache_beam), ele vai me retornar todos os métodos que eu posso utilizar a partir dessa minha importação. Vou limpar meu terminal e saio do terminal com “Ctrl + D”, vou limpar novamente.

Estarei utilizando como editor de código o VS Code. Para abrir o VS Code diretamente aqui na pasta onde eu estou, que é a pasta onde nós vamos desenvolver todas as atividades e os códigos do curso, é só digitar code . e ele vai abrir o VS Code dentro já desse diretório para que possamos fazer a criação dos primeiros códigos.

Abriu aqui o meu VS Code, vejam que ele está na pasta lá que nós determinamos, que é a do curso, onde vamos colocar os códigos, e eu vou criar aqui um arquivo. Cliquei aqui à esquerda em “New File”, vou digitar o nome dele, vou chamá-lo de “main.py” e agora eu tenho no meu editor de código como começar a digitar os primeiros comandos, e por enquanto nós só vamos fazer um teste, o mesmo teste que fizemos lá no terminal.

Vou fechar essa aba “Python - Get Started”, vou fazer a importação do Apache Beam, import apache_beam e vou colocar aqui um print, só para dizer que importou, (‘Apache beam importado!’).

E eu vou também dar um print aqui com os métodos, (dir, que ele vai mostrar tudo o que eu tenho lá dentro do Apache Beam que eu acabei de importar, os métodos e as classes disponíveis, e eu vou colocar aqui (apache_beam)). Salvei.

Eu vou executar – é mais uma coisa que eu gosto de fazer e prefiro – diretamente do terminal esse código main, mas se você utiliza já o VS Code e quiser utilizar aqui a parte de debug, de run dele, fique à vontade. Se você estiver também utilizando outra IDE, como o PyCharm, também vai funcionar da mesma forma.

Vou retornar aqui o meu terminal e vou verificar o que temos agora aqui. Além daquela pasta do nosso ambiente virtual, nós temos também um arquivo chamado “main.py”. Vou executá-lo, é só digitar python e o nome do arquivo, main.py, e ele deve fazer a mesma coisa que fez no terminal interativo. Vou executá-lo.

Após a execução, nós vemos que ele “printou” Apache beam importado! e depois tudo o que vem lá no método dir, todas as classes e métodos disponíveis para podermos utilizar a partir do Apache Beam. Vou limpar aqui o terminal.

Agora que nós temos o nosso ambiente criado e testado, o que nós precisamos saber para criar as bases com as informações de casos de dengue e quantidade de chuva, que é o problema que nós vamos resolver? Vamos retornar um pouco no slide.

Dentro do processo de engenharia de dados, estaremos transformando dados geralmente brutos, ou seja, sem que tenham passado por nenhum tipo de processo, filtro ou regra, em uma informação útil para análise em outros processos.

Esses dados geralmente estão em arquivos de textos separados por um delimitador ou então em arquivos posicionais, bem como em bancos de dados, mas a maioria das vezes eles vêm realmente em arquivos de texto geralmente muito grandes.

Quem trabalha com engenharia de dados é responsável por gerenciar, otimizar, supervisionar e monitorar recuperação, armazenamento e distribuição de dados em toda a organização, ou seja, estaremos responsáveis desde a ingestão ou aquisição dos dados até a disponibilização e persistência dessas informações após tratamentos e transformações.

Para realizar essas transformações, utilizaremos algoritmos. Esses algoritmos são criados para pegar esses dados brutos e transformando a informação que seja útil para análise, como já foi falado.

Além de ter um conhecimento de lógica e de uma linguagem de programação, é necessário entender os objetivos da empresa ou do cliente, ou seja, as regras de negócio envolvidas em todo aquele processo que estamos realizando.

Além disso, otimizamos a recuperação, ingestão e persistência dos dados e desenvolvimento dos dashboards, relatórios e outros tipos de visualização, não só para os cientistas de dados, mas como também para outros setores da empresa, como a área de produtos, financeiro e acompanhamento de clientes.

Dentro do processo de engenharia de dados, criamos e trabalhamos com pipelines. As pipelines são etapas ou parte distintas do processo engenharia de dados. É como se eu pegasse todo o processo, aplicasse as regras de negócio, dividindo-o em etapas distintas.

Essa separação do processo visa principalmente satisfazer essas regras de negócio, ou seja, o que eu tenho que resolver. Porém, também ela é muito necessária para que possamos realizar a paralelização da execução desses processos. Por que essa paralelização?

Geralmente na engenharia de dados trabalhamos com muitos dados, ou seja, com grande quantidade de dados de arquivos, por exemplo, de texto. Otimizar esse processo é uma maneira eficaz utilizando o paralelismo.

Quando paralelizamos essa informação em diversos processos que estão sendo executados ao mesmo tempo, temos uma execução mais rápida daquele processo, que geralmente tem que ocorrer todo dia, então isso tem que ocorrer de maneira mais rápida e performática possível.

Agora que entendemos um pouco mais sobre o nosso problema, o ferramental que iremos utilizar e temos o nosso ambiente preparado, vamos analisar os dados brutos que iremos utilizar para gerar a base de informação com os casos de dengue e quantidade de chuvas que disponibilizaremos ao final do curso.

Data Pipeline - Dados brutos

Agora que temos o nosso ambiente pronto, vamos analisar os dados que devemos utilizar durante todo esse processo, ou seja, vamos analisar os dados brutos de casos de dengue e de precipitação meteorológica para avaliarmos como prosseguir e aplicar as transformações necessárias no decorrer do nosso processo.

Vou voltar no meu editor de código, na minha pasta do meu projeto. Aqui à esquerda, na pasta do nosso projeto, nós temos agora dois arquivos, que são os casos de dengue, que é um arquivo de texto, e temos o “chuvas”, que é um arquivo .csv. Esses arquivos são disponibilizados para vocês utilizarem no processo e nós vamos começar a avaliar esses dados.

Inicialmente nós temos aqui o dataset de quantidade de chuvas. Vejamos que é um dataset até grande, nós temos aqui mais de 23 milhões de linhas, ou seja, mais de 23 milhões de informações a respeito dessa quantidade de chuvas.

E eles estão divididos, nós podemos perceber que na primeira linha, no header ou cabeçalho, nós temos a informação de data, mm, que seria milímetros, uf, ou seja, nós temos a data capturada dessa informação, a quantidade de chuva, nós temos a quantidade de chuva em milímetros e nós temos o estado onde foi contabilizado isso.

Aqui nós temos uma data separada por um hífen, “2015-09-02,”, ou seja, a vírgula é o que separa a informação do que é data para a próxima informação, que é a quantidade de chuva em milímetros, que aqui está “-9999.0”. Depois vírgula, nós temos a próxima informação, que é o estado.

Vemos aqui que nós vamos precisar fazer uma série de tratamentos nesse dataset. Vemos que a quantidade de chuvas está negativa, -9999.0. Essas informações foram adquiridas em estações meteorológicas, com certeza na leitura dessa informação ela teve um erro, então nós temos que tratar essa informação, porque essa medida não existe, ela é 0, não tem quantidade de chuva negativa.

Aqui para baixo já está correta, “0.0”, nós já temos aqui outro estado, São Paulo, então tem muita informação, aqui nós já temos uma quantidade de chuva nesse dia, nessa estação, que foi 11.4, isso nós já começamos a avaliar um pouco esse dataset.

Nós temos aqui nos casos de dengue um dataset já um pouco maior. Agora nós temos outro delimitador, aqui nas chuvas a informação era delimitada por vírgula, agora nós temos nos casos de dengue o pipe, a barra em pé. E nós temos outras informações, nós temos o ID, “data_iniSE”, o que é isso? É a data do início da semana epidemiológica, como o dataset veio distribuído.

Nós temos a quantidade de casos de dengue, nós temos um código do IBGE, que é um código que eles têm uma tabela relacionada à cidade e o código que ela tem, por exemplo, Abaiara, aqui no meu estado, Ceará, tem esse código aqui “230010”.

Nós temos o nome da cidade, temos o estado, nós temos o CEP, vou para o lado um pouco, latitude e longitude, ou seja, nós temos bem mais informações nesse dataset de casos de dengue.

Vou abrir aqui o slide que eu montei de uma forma mais fácil para visualizarmos e entendermos. Nós temos aqui o mesmo dataset, é a mesma informação. Vemos aqui a data, quantidade de chuvas em milímetros e uf.

Eu acho que já deu para vocês notarem, a informação que temos em comum entre os dois datasets seria o estado, nós temos aqui o Estado do Pará e nós temos a quantidade de chuvas e também a data, nós temos uma data aqui também.

E essa data, no caso aqui, é dia a dia, então nós temos várias leituras no mesmo dia. Veja aqui, várias leituras no dia 2, várias leituras no dia 3 e assim consecutivamente.

Quando avaliamos aqui o dataset de casos de dengue, vemos que ele já não é naquele mesmo padrão, isso é um problema que nós temos que resolver.

Nós temos aqui também o estado, está aqui, uf, por exemplo, Ceará. Mas a data não é daquela forma, nós temos uma data de início da semana epidemiológica. Dessa forma, já que não temos o dataset igual, aqui no dataset de chuvas nós temos dia a dia várias leituras, aqui nos casos de dengue, não, nós temos uma data de início da semana epidemiológica.

Então vemos aqui que nós temos um problema a resolver no sentido de juntar esses dois datasets, ou seja, no de chuvas nós temos um dia bem definido com várias leituras, dia 2, dia 3. Aqui no de casos de dengue, não, nós temos apenas uma data de início da semana epidemiológica, nós não temos dia a dia essa informação.

Mas o que nós temos aqui em comum? Nós temos em comum os meses, por exemplo. Nós temos o mês em comum, nós temos o ano, comum nos dois datasets, ou seja, eu tenho o ano 2015 e eu tenho o mês 9, mas eu não tenho aqui no dataset de casos de dengue o dia, por exemplo, o dia 2, o dia 3, assim como no dataset de chuvas.

Então vamos ter que talvez utilizar essa informação do mês e do ano para conseguir juntar os dois datasets relacionados à parte de data. Mas nós temos aqui nos casos de dengue já o estado, nós temos o estado também nesse dataset de chuvas, então nós podemos também utilizar por estado essa informação.

Veja que nós temos o campo de data separado pelo pipe, o divisor. Nós temos aqui a quantidade de casos, também separado pelo pipe, veja que nesse dia nós já tivemos um caso aqui em Abaiara.

Nós temos a cidade e o estado que nós vamos utilizar e já no outro, de chuvas, como eu falei, nós vamos ter a data, a quantidade de chuvas em milímetros e o estado, ou seja, nós vamos acabar utilizando essas três informações do dataset de chuvas e nós vamos utilizar a informação da data do início da semana epidemiológica do dataset de dengue, nós vamos utilizar o estado e a quantidade de casos.

Após essa pequena avaliação, que nós vamos tirar outros insights durante o tratamento, vimos que nós vamos ter que desprezar parte do dataset de casos de dengue para podermos unificar com o dataset de quantidade de chuvas.

Nós vamos utilizar aqui as três colunas, porém no dataset de caso de dengue, para poder unificar com o outro, mas nós teremos que utilizar aqui a coluna de data de início da semana epidemiológica, só que nós vamos ter que tratar utilizando apenas o mês e o ano, vamos utilizar a coluna do estado e vamos utilizar a coluna da quantidade de casos.

Agora que analisamos os dados e tiramos insights da forma pela qual trataremos essas informações, começaremos a criar as nossas pipelines, a partir dos insights que nós tiramos aqui agora.

Nós vamos utilizar apenas algumas colunas do dataset de casos de dengue porque o nosso dataset de quantidade de chuvas está bem limitado, ele tem apenas três colunas, então nós temos que tratar principalmente o dataset de casos de dengue para que ele se adeque ao dataset que nós vamos utilizar de quantidade de chuvas.

Sobre o curso Apache Beam: Data Pipeline com Python

O curso Apache Beam: Data Pipeline com Python possui 221 minutos de vídeos, em um total de 63 atividades. Gostou? Conheça nossos outros cursos de Engenharia de Dados em Data Science, ou leia nossos artigos de Data Science.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Engenharia de Dados acessando integralmente esse e outros cursos, comece hoje!

Conheça os Planos para Empresas