Alura > Cursos de Data Science > Cursos de Engenharia de Dados > Conteúdos de Engenharia de Dados > Primeiras aulas do curso Apache Airflow: orquestrando seu primeiro pipeline de dados

Apache Airflow: orquestrando seu primeiro pipeline de dados

Pipeline de dados - Apresentação

Bem-vindo ao curso de Apache Airflow: orquestrando seu primeiro pipeline de dados! Meu nome é Millena Gená e eu serei sua instrutora nessa jornada!

Millena Gená é uma mulher branca de olhos castanhos e cabelos escuros e cacheados. Veste uma camiseta cinza e está sentada em uma cadeira gamer de cor azul. Ao fundo, um ambiente de iluminação verde claro, e uma mesa com livros da saga Harry Potter, uma pelúcia de polvo roxo e um funko da personagem Eleven, de Stranger Things.

Neste curso, entenderemos o que é o Airflow, quais seus principais conceitos, como DAGs, tasks e operators, e os componentes da arquitetura desta ferramenta. Aprenderemos, ainda, a instalá-lo e conheceremos, também, os recursos fundamentais de sua interface.

Em seguida, desenvolveremos um DAG inicial, com diferentes tarefas, para que possamos compreender essa estruturação. Depois, implementaremos um mais complexo para auxiliar uma empresa de turismo a obter dados semanais sobre a previsão do tempo de determinada localidade.

Para melhor aproveitar essa formação, é importante que você tenha conhecimento prévio em linguagem Python e comandos Linux, para trabalharmos no terminal.

Vamos lá? Te aguardo nos próximos vídeos!

Pipeline de dados - Entendendo o problema

Vamos supor que fomos contratados por uma empresa de turismo, situada em Boston, para desenvolver um pipeline de dados, ou, em inglês, data pipeline.

Um pipeline de dados é basicamente um conjunto de ações que realizará o processamento de dados específicos.

A intenção da empresa é que o pipeline extraia a previsão do tempo da cidade de Boston nos próximos 7 dias e armazene esses dados, assim, ela poderá usá-los para organizar a programação de passeios de acordo com a previsão do tempo.

Na internet, existem diversas APIs que trazem informações climáticas sobre diferentes lugares do mundo e nós utilizaremos uma delas, a Visualcrossing. Vamos acessar o site https://www.visualcrossing.com e logar.

Há uma atividade que ensina o passo a passo para que você crie uma conta no site da API.

Ao logar, surgem diversas informações sobre a sua conta. A principal delas, é a key ("chave" em português), que será necessária para que possamos extrair os dados fornecidos pela API - vale ressaltar que cada conta possui uma chave específica. Além disso, é importante que saibamos que, assim como qualquer outra API, esta também possui limitações. Como estamos utilizando a versão gratuita, temos direito de realizar até 1000 requisições por dia.

Vamos acessar a documentação da API para melhor entender seu funcionamento. Para isso, clique em "More", na barra superior, e, em seguida, em "Documentation". Note que há diversos artigos sobre diferentes maneiras de extrair dados, mas vamos nos ater ao artigo "Timeline Weather API". Clique para abri-lo.

Esta API nos permite acessar dados antigos e atuais sobre o clima, além da previsão do tempo. Mais abaixo, após "API Request Types", há uma url que devemos utilizar para conseguir extrair dados da API. Perceba que, nela, constam campos que deveremos informar à url, como "location", "date" e "key", ou seja, localização, data e chave, respectivamente. Em seguida, estão listados outros parâmetros que também podem ser adicionados à url.

Caso seja do seu interesse, dê uma lida na documentação para entender as possibilidades que esta API nos proporciona.

Já sabemos o trabalho que precisamos fazer e encontramos uma API que nos auxiliará na realização, agora podemos desenvolver o data pipeline. Vamos lá?

Pipeline de dados - Extraindo dados climáticos

Vamos iniciar o desenvolvimento do pipeline de dados!

Primeiramente, criaremos uma pasta, dentro dos documentos da nossa máquina, chamada "datapipeline". Em seguida, a abriremos no VS Code - editor de código que utilizaremos neste curso. Para abri-la, basta clicar em "Open Folder", na aba inicial do Visual Studio, e selecionar a pasta em questão.

Com a pasta aberta, crie um novo arquivo clicando em "New File", o primeiro símbolo à direita do nome da pasta, e o nomeie como "extrai_infos_clima.py". Em seguida, adicionaremos o código de importação de algumas bibliotecas que precisaremos utilizar:

import os
from os.path import join
import pandas as pd
from datetime import datetime, timedelta

A biblioteca OS é utilizada para manipulação de arquivos e caminhos; a função join() é usada para juntar dois caminhos; Pandas usa-se para manipulação de dataframe, e Datetime para trabalhar com manipulação de dados.

Agora, podemos definir algumas variáveis que teremos de especificar para a url de requisição. Para isso, utilizaremos o seguinte código:

# intervalo de datas
data_inicio = datetime.today()
data_fim = data_inicio + timedelta(days=7)

# formatando as datas
data_inicio = data_inicio.strftime('%Y-%m-%d')
data_fim = data_fim.strftime('%Y-%m-%d')

city = 'Boston'
key = 'ANZQ5K8QQP8BXZ85F4EQ2FPK'

Vamos entender a estrutura deste código!

Inicialmente, definimos o intervalo de datas. Dessa forma, data_inicio é definida como o momento em que o código é executado, enquanto data_fim corresponde ao valor de data_inicio + 7 dias, performando o intervalo de uma semana. Em seguida, formatamos essas datas, pegando somente ano, mês e dia, uma vez que datetime.today() retorna, também, minutos e horas, informações que não agregam ao nosso projeto. Por fim, as duas últimas variáveis definem a cidade (city) sobre a qual queremos extrair os dados e a chave que pegamos na API (key).

Lembre-se que esta chave corresponde à conta do instrutor. Você deve usar a chave que a API fornece para a sua conta.

Agora que definimos essas informações, podemos passá-las para a url:

URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
            f'{city}/{data_inicio}/{data_fim}?unitGroup=metric&include=days&key={key}&contentType=csv')

Estamos utilizando a função join() porque dividimos a url em duas partes: a primeira corresponde à url da documentação, enquanto a segunda, onde há f string, elenca as variáveis definidas anteriormente, como cidade e intervalo de datas, além de outros parâmetros que podem ser interessantes para o nosso projeto, como o sistema de unidades (unitGroup), condensação dos dados em dias (include=days), a chave (key) e o formato com o qual desejamos que os dados venham (csv).

Como especificamos que os dados devem vir no formato csv, podemos utilizar a função do Pandas read_csv() para lê-los. Faremos isso criando uma variável dados e chamando pd.read_csv, que receberá URL. Em seguida, chamaremos o comando de impressão print(dados.head) para que possamos ver as primeiras linhas desses dados no terminal.

dados = pd.read_csv(URL)
print(dados.head())

Pressione "Ctrl + S" para salvar e vamos testar o nosso código.

Vamos abrir o terminal teclando "Ctrl + J" e digitar o comando python3 extrai_infos_clima.py.

python3 extrai_infos_clima.py

Ao executar o comando, nos são retornadas as 5 primeiras linhas dos dados extraídos da API. Note que há várias colunas e não é possível renderizar todas elas, por isso são representadas por reticências ....

Agora, precisamos salvar esses arquivos em uma pasta. Vamos fechar o terminal e criar a variável file_path que receberá o caminho da pasta na qual queremos salvá-los. Para isso, acesse a barra lateral, clique sobre ela e, em seguida, no botão direito do mouse, selecione "Copy Path" e cole na variável criada.

file_path = 'home/millenagena/Documents/datapipeline'

Queremos criar uma subpasta para salvar esses dados, então acrescentaremos / e o nome que queremos dar a essa nova pasta, que será semanas. Em seguida, adicionamos o sinal de igualdade = e entre chaves passamos {data_inicio}. Como passamos uma variável à url do caminho, o transfomaremos em uma f string.

file_path = f'home/millenagena/Documents/datapipeline/semana={data_inicio}/'

Para que essa pasta seja criada, podemos utilizar o comando mkdir da biblioteca OS.

file_path = f'home/millenagena/Documents/datapipeline/semana={data_inicio}/'
os.mkdir(file_path)

Agora que definimos o caminho e criamos uma pasta que terá a data de início da extração, vamos salvar esses dados em csv utilizando o método to_csv da biblioteca Pandas. Este é o trecho de código que usaremos:

dados.to_csv(file_path + 'dados_brutos.csv')
dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')

Observe que estamos salvando em 3 arquivos diferentes. No primeiro, passamos o caminho da pasta e o nome que queremos que esse arquivo tenha (dados_brutos.csv). Como vimos, esses dados possuem muitas colunas, então nos 2 arquivos seguintes, definimos aquelas que são mais relevantes para o nosso contexto como temperaturas média, mínima e máxima, e descrição do dia, salvando-os como temperaturas.csv e condicoes.csv, respectivamente.

Esta é a configuração final do nosso código:

import os
from os.path import join
import pandas as pd
from datetime import datetime, timedelta

# intervalo de datas
data_inicio = datetime.today()
data_fim = data_inicio + timedelta(days=7)

# formatando as datas
data_inicio = data_inicio.strftime('%Y-%m-%d')
data_fim = data_fim.strftime('%Y-%m-%d')

city = 'Boston'
key = 'ANZQ5K8QQP8BXZ85F4EQ2FPK'

URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
            f'{city}/{data_inicio}/{data_fim}?unitGroup=metric&include=days&key={key}&contentType=csv')

dados = pd.read_csv(URL)
print(dados.head())

file_path = f'home/millenagena/Documents/datapipeline/semana={data_inicio}/'
os.mkdir(file_path)

dados.to_csv(file_path + 'dados_brutos.csv')
dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')

Vamos salvá-lo e testá-lo no terminal com o comando python3 extrai_infos_clima.py.

python3 extrai_infos_clima.py

Ao executar, os dados são printados novamente. Mas, para nos certificarmos que o código realmente funcionou, abrimos a barra lateral e observamos se a pasta foi criada com a data de hoje. Dentro dela devem estar os arquivos extraídos condicoes.csv, dados_brutos.csv e temperaturas.csv. Nosso pipeline de dados funcionou! Criamos uma pasta com a data de início da extração na qual são armazenados os dados extraídos dos 7 dias a frente desta data. No entanto, temos um problema: as demais pessoas da empresa terão que executá-lo manualmente para obter os dados e trabalhar em cima deles.

O ideal, então, é que nosso pipeline seja mais automático e possa ser executado em um dia determinado. Dessa forma, é necessário que o processo seja agendado para acontecer sempre em um dia específico de forma automatizada. Será que é possível? Veremos nos próximos vídeos!

Sobre o curso Apache Airflow: orquestrando seu primeiro pipeline de dados

O curso Apache Airflow: orquestrando seu primeiro pipeline de dados possui 109 minutos de vídeos, em um total de 57 atividades. Gostou? Conheça nossos outros cursos de Engenharia de Dados em Data Science, ou leia nossos artigos de Data Science.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Engenharia de Dados acessando integralmente esse e outros cursos, comece hoje!

Conheça os Planos para Empresas