Pesquisar este blog

domingo, 18 de outubro de 2020

Exemplo de Spark Stream com Twitter

 No último post vimos como instalar o Apache Spark com docker, agora vamos mostrar um exemplo de como usar sua api de Stream pra ler dados do Twitter e processá-los.

Data Lakes

Muitas vezes existe mais de um sistema rodando dentro da empresa, e cada um deles possui um serviço de armazenamento, uma hora será necessário juntar os dados de todos estes sistemas em apenas um lugar, para que seja possível fazer uma análise completa dos dados da empresa. 

Aqui mesmo neste blog temos um post explicando uma das limitações dos bancos de dados, conforme o volume de dados aumenta, muitas vezes os sistemas começam a ficar travados e lentos, e por isso uma outra cópia do armazenamento de dados deverá ser feita, este é um dos fatores que geram esta necessidade de separar os dados para relatórios, dos dados de produção.

Por estes motivos, é cada vez mais comum a criação de Data Lakes dentro das empresas. Estes sistemas centralizam os dados das empresas, neles são inseridos os dados de todos os sistemas que rodam dentro do daquele ambiente e a partir do data lake  são gerados os relatórios, que entre outras coisas, podem cruzar os dados de todos os sistemas que rodam naquela empresa.

Stream

 Stream tem sido muito usado para sincronizar sistemas de armazenamento de dados, por isso os Data Lakes se aproveitam bastante deste modelo de comunicação de sistemas.

Alguns dados não precisam ser copiados assim que são criados, então sistemas de baches e jobs espaçados, que rodam a cada 3h por exemplo, resolvem este tipo de sincronização, mas existem alguns dados, geralmente os dados do coração do produto das empresas, estes sim, necessitam estar quentes ou seja, assim que são criados eles precisam ser replicados, pois fatalmente existem alguns relatórios que precisam ver eles em tempo real.

É com o uso de streams que estes dados serão copiados, com eles é possível criar réplicas com até 30 segundos de diferença, o que é algo bem quente. Imagina que uma cópia da tabela principal da empresa, pode estar atualizada com 30 segundos de latência.

Estes streams, geralmente usam sistemas de filas, onde todas as modificações dos bancos são postadas, as vezes usa-se os logs de alteração do banco, para postar nestas filas. A partir da fila criamos um sistema de leitura por stream, que receberá os dados assim que eles chegam na fila. Um outro exemplo são os streams criados diretamente dos sistemas de arquivos, ou seja, assim que o dado é escrito no disco, ele já estará disponível no stream para ser processado.

Apache Spark Stream

O Stream do Spark é um pouco diferente do conceito de stream que estamos acostumados, ao invés de ser um fluxo de dados contínuo, neste caso ele trabalha com microbatches, ou seja configuramos serviços que irão ler do stream de tempos em tempos, e tanto as janelas, quanto o volume de dados lido, são parâmetros configuráveis e isso implica num fluxo quase contínuo de dados. Existem sistemas como o Apache Flink, que possuem um sistema de Stream direto, onde os dados são processados assim que eles chegam e não há um intervalo configurável.

A diferença crucial é o tempo, um sistema de stream direto os dados chegam quase que em tempo real, em um sistema de microbatches ele pode ter até 2x a janela de tempo do intervalo entre as chamadas do microbatch.

Preparação do ambiente

Para testar o stream do Spark, vamos usar uma conexão de Stream do twitter, algo que já foi explicado neste post. Desta vez não vamos usar um cluster, mas sim uma versão standalone do spark rodando dentro de um notebook jupyter.

Aqui vamos usar um notebook Jupyter, com um kernel de Scala. Kernel é o core de execução dos notebooks, por padrão eles rodam python, mas existem algumas alternativas que rodam o Scala, dentre outras linguagens, neste caso estamos usando o almond, que é uma implementação opensource de kernel do Scala no Jupyter.

Para rodar uma imagem docker com o spark usando este projeto, usa-se o seguinte comando:

docker run -it --rm -p 8888:8888 almondsh/almond:latest

Ao iniciar a imagem, ele irá imprimir uma série de logs, mas devemos prestar atenção a um deles especificamente, caso você não iteraja com a imagem, ao final ele irá imprimir um endereço parecido com este:

http://127.0.0.1:8888/?token=a307c1b97755d83deafd2361a2a2fe5a5817c211968e5a97

Aqui temos o endereço de conexão, localhost porta 8888, além do token de conexão, este token é usado para garantir que apenas quem deve, consiga acessar, ler e executar os notebooks neste ambiente.

Por padrão esta imagem roda com a versão 2.13 do Scala, mas no nosso exemplo estaremos usando a versão 2.12, por isso certifique-se de alterar o kernel que está sendo executado com a versão correta do Scala. Esta modificação pode ser feita no menu em:

Kernel > Change Kernel > Scala 2.12

Desta vez estaremos usando a versão 2.4.0, pois esta é a última versão que possui compatibilidade com as bibliotecas do Twitter Stream.

Spark Stream com Twitter

Agora, o último passo será executar uma conexão de Stream do Spark com o Twitter, para isso será necessário ter as chaves de acesso, para isso cada um terá que criar um aplicativo dentro da sua conta do Twitter. 

O primeiro passo será instalar as dependencias, e o almond facilita bastante nesta parte, pois ele permite que estas dependências sejam instaladas pelo próprio notebook.

import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`org.apache.spark::spark-streaming:2.4.0`
import $ivy.`org.apache.spark::spark-core:2.4.0`
import $ivy.`org.apache.bahir::spark-streaming-twitter:2.4.0`
import $ivy.`sh.almond::almond-spark:0.10.8`
import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status

import org.apache.spark.streaming.twitter.TwitterUtils

Logger.getLogger("org").setLevel(Level.OFF)

As primeiras 5 linhas, instalam as dependencias, tanto do Spark, quanto do twitter, estes comandos irão baixar os jars necessários, diretamente dos repositórios maven, e adicioná-los ao contexto de execução do notebook.

No último comando, estamos desligando os logs, pois neste caso não iremos usar, e eles são muito verbosos.

Um ponto importante a levantar aqui é, devemos sempre fazer backups dos nossos notebooks, pois toda vez que paramos a execução e iniciamos ela novamente, os notebooks criados ali serão perdidos. No nosso caso isso não impacta muito pois o caso de exemplo é um notebook bem simples.

No próximo passo iremos criar o contexto de Stream do Spark, e é nele que iremos configurar as propriedades dos nossos microbatches:

 val appName = "TwitterSparkDSD"
 val conf = new SparkConf()
 conf.setAppName(appName).setMaster("local[*]")
 val ssc = new StreamingContext(conf, Seconds(5))

Neste passo definimos algumas configurações básicas, como o nome do aplicativo, assim comoa localização do master do Spark, que neste caso está rodando localmente, e iniciamos o contexto do streaming. Repare que no segundo argumento do contexto do streaming, passamos uma variável temporal de 5 segundos. Isso significa que, a cada 5 segundos o spark iniciará um processo de microbatch para ler os dados desta fonte. 

É muito importante notar que, o tempo de processamento de cada batch, deve ser menor do que 5 segundos, neste caso, senão isso significa que o nosso cluster não está sendo capaz de processar todos os dados que estão chegando. Neste caso os dados podem ficar acumulados e até travar o cluster.

Agora chegou a hora de criar o objeto de autenticação no sistema do twitter, isso será criado a partir do código:

val consumerKey = ""
val consumerSecret = ""
val accessToken = ""
val accessTokenSecret = ""

val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret)
val auth = new OAuthAuthorization(cb.build)

Repare que aqui todos os tokens de acesso estão em branco, estes dados estão disponíveis na página da sua aplicação, dentro do twitter, substitua cada um deles pelos valores gerados para o seu aplicativo.

Agora vamos nos conectar a api do Twitter, isso será feito com o seguinte comando:

val tweets = TwitterUtils.createStream(ssc, Some(auth))

Repare que aqui foi criado um stream de tweets, mas realmente precisamos de 2 streams? Já não tinha sido criado um stream do spark?

Bom a ideia deste pedaço de código é criar a conexão de leitura, passamos os dados de autenticação, além do stream do spark, este método irá conectar o contexto de Stream do Spark, com a api de stream do twitter.

O volume de dados que chega do Twitter é obviamente muito grande, por isso vamos filtrar para conseguir provar que o código realmente está funcionando, para isso faremos o seguinte filtro:

val dsdTweets = tweets.filter(_.getText() == "#DSD")
dsdTweets.saveAsTextFiles("tweets", "json")

Com este filtro, serão considerados apenas os casos em que os tweets tiverem a string #DSD dentro do texto digitado. Assim dá pra fazer um teste ao vivo e twittar um conteúdo com este texto e validar se ele realmente vai aparecer gravado nos arquivos. Vejam que no comando seguinte, os dados estão sendo gravados como arquivos texto com o json de conteúdo dos tweets recebidos. Repare também, que as duas strings passadas como argumento para o método que salva os dados, são tweets e json, elas serão usadas para criar diretórios, cada batch gravará um diretório iniciando com tweets e terminados como json. Para não haver conflito com estes diretórios, o timestamp será adicionado ao nome de cada um deles, assim não teremos dois batches gravando no mesmo diretório.

Esta api é aberta e pode ser usada gratuitamente, mas quando estamos usando ela sem pagar, não acessaremos todos os tweets da base, para fazer isso precisamos pagar para o twitter, por ser de graça ela tem alguns limites e, por isso, talvez alguns tweets do seu teste serão perdidos.

Falta ainda um passo para terminar o nosso leitor de tweet, que é inicializar o processo:

ssc.start()
ssc.awaitTermination()

Aqui foram 2 comandos, o primeiro inicia o stream, e o segundo diz quando que o job de stream irá terminar. Com este comando, nosso stream só vai terminar quando o job falhar, ou quando ele for manualmente parado. Para desligar o job manualmente, é só parar o kernel do jupyter, isso fará que toda a execução seja desligada.

Para validar que conseguimos acessar algum tweet, vamos checar o sistema de arquivos, isso pode ser feito, acessando a tab inicial do jupyter, conforme mostra a imagem abaixo:

Aqui são mostrados vários diretórios, isso significa que o processo de salvamento dos dados funcionou, agora está noa hora de acessar estes diretórios para ver se os dados realmente estão lá:

Repare que temos vários arquivos nomeados part-xxxxx, isso acontece pois cada executor do spark irá gravar um arquivo separado. Cada executor do spark processará parte dos dados, e ele tem a capacidade de conectar cada executor para ler em paralelo da api, com isso a velocidade de processamento dos dados será em paralelo.

Veja que alguns arquivos tem dados, e outros não, isso significa que alguns executores tiveram dados para processar, mas outros não.

Dentro dos arquivos é possível ver os dados de alguns tweets, não vou postar aqui pois isso vai ser diferente cada vez que você acessar, então não faz sentido.