Pesquisar este blog

domingo, 6 de novembro de 2016

Twitter Stream com Akka 2.5 PlayFramework 2.6 no Java

Esta postagem é uma atualização do que foi explicado neste post.

O Twitter é uma rede social que provê um canal de comunicação, por onde podemos nos conectar para acessar os tweets, através de stream. Existem várias aplicações interessantes para esta plataforma, muitas delas envolvendo o uso de inteligência artificial.

Todo o código usado para explicar este tópico está no seguinte repositório.

Para acessar o stream do twitter, vamos utilizar uma lib chamada twitter4j-stream, esta api que irá converter as chamadas do java para a api REST do twitter, e vice e versa. Para adicioná-la ao nosso projeto, basta colocar ela como dependência do build.sbt:

"org.twitter4j" % "twitter4j-stream" % "4.0.2"
Uma outra dependência deste projeto será o playframework que atualmente está na versão 2.5.9.
Como vimos anteriormente, aqui, aqui e aqui, no play, há um arquivo de configuração das rotas, que é responsável por direcionar cada tipo de chamada, em cada caminho, a um método específico de um Controller.
Nosso arquivo de rotas terá o seguinte conteúdo:
# Home page
GET     /               @controllers.ApplicationController.index
GET     /stream    @controllers.ApplicationController.stream(query: String)

# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.at(path="/public", file)
Ou seja, teremos dois métodos get, um stream, que recebe um parâmetro chamado query, que será responsável por filtrar os tweets, e um método chamado index, que na verdade só fará um redirecionamento pro método stream, passando os atributos ruby e java.
A última dependência que teremos neste projeto, é o Akka, que é uma implementação do modelo de atores em Java e Scala.

Acessando o stream do twitter.

O primeiro passo para conseguir processar um stream do twitter, é criar uma app nele, e isto pode ser feito na página de dev do twitter. Lá existem instruções de como criar a sua app.
Depois de criada, você terá disponível 4 chaves, 2 de Consumer (Key e Secret) e 2 de Acesso ( AccessToken e AccessTokenSecret). Este tipo de autenticação obedece ao padrão OAuth.
Estas configurações serão feitas usando os seguintes parâmetros:
private Configuration config;
private ConfigurationBuilder configBuilder = new ConfigurationBuilder();
public StatusListener statusListener;
public TwitterStream twitterStream;
FilterQuery query;
Config é onde iremos colocar as configurações, o configBuilder é quem irá criar as configurações e o twitterStream é o objeto que se conecta ao twitter e traz os tweets.
A configuração será feita, desta maneira:
configBuilder.setOAuthConsumerKey("consumer_key");
configBuilder.setOAuthConsumerSecret("consumer_secret");
configBuilder.setOAuthAccessToken("access_token");
configBuilder.setOAuthAccessTokenSecret("token_secret");
config = configBuilder.build(); 
twitterStream = new TwitterStreamFactory(config).getInstance();
 Agora temos uma instância do twitterStream e é ele que iremos usar para se conectar à api do twitter, aliás, isto já foi feito, quando usamos o método getInstance();
O próximo objeto que iremos usar aqui é o SearchQuery, este objeto recebe os valores que queremos usar para pesquisar os tweets, no nosso exemplo será usado apenas a pesquisa por palavra, ou seja, uma palavra que queremos que esteja presente em todos os tweets. Existem outras formas de se pesquisar os tweets, como a localização (através da latitude e da longitude), a língua do tweet e os usuários que queremos seguir.
A construção deste objeto é  feita da seguinte maneira:
FilterQuery query = new FilterQuery().track(new String[]{searchQuery});
Depois é só adicionar o objeto de query ao twitterStream:
twitterStream.filter(query);
Agora todos os tweets serão filtrados de acordo com a nossa query, que no caso será por palavras. O último objeto que será utilizado é o StatusListener, este objeto escuta os eventos do stream, e quando um tweet chega, um método dele será chamado. Este objeto segue o padrão de listener, que é o mesmo utilizado pela api Swing do Java, e ele funciona exatamente da mesma maneira. Este objeto foi criado da seguinte maneira:
           public void onStatus(Status status) {
                Logger.info(status.getUser().getName());
                StringBuilder mensagem = new StringBuilder();
                try {
                    mensagem.append(status.getUser().getName());
                    mensagem.append(":");
                    mensagem.append(new String(status.getText().getBytes(), "utf-8"));
                    mensagem.append("\n");
                } catch (Exception e) {
                    e.printStackTrace();

                }
                socketActor.tell(ByteString.fromString(mensagem.toString()),
                        ActorRef.noSender());
            }
Aqui é a prirmeira vez que temos contato com um ator, o socketActor, mas o seu papel no sistema será explicado mais tarde.
Repare que o StatusListener possui um método chamado onStatus, que está sendo sobrescrito aqui, na verdade existem outros métodos do StatusListener que deverão ser sobrescritos, mas neste caso de uso, somente o onStatus é que interefere no comportamento do aplicativo e por isto, somente ele será descrito.
Logger.info é uma forma de escrevermos no log da aplicação. Repare que estamos escrevendo lá o nome dos usuários dos tweets recebidos.
O próximo passo agora é pegar os dados do tweet, que neste caso são o texto e o nome do usuário. Todos estes dados serão enviados para o ator. Repare que estamos usanso um cast (Object) para objeto, logo logo isto será explicado.
Depois de configurar o Listener, deve-se adicioná-lo ao twitterStream:
twitterStream.addListener(statusListener);

Akka e os atores

O modelo de atores é um modelo que podemos chamar de "bem simples". Trata-se de uma estrutura, onde criamos os atores, e estes atores são programados para processar um tipo de mensagem. As mensagens, como padrão, são classes static, que podem, ou não, possuir atributos. Os atores, não tem estado, ou seja, não possuem um parametro dentro dele que pode ser alterado pelo processamento de uma mensagem. 

No Akka, existe uma classe chamada ActorSystem, e é esta classe que controla quais e quantas instâncias de atores estão disponíveis para serem utilizados. Quando um ator recebe uma mensagem, ele irá alocar uma instância de um ator daquele tipo, para processar aquela mensagem. Enquanto isto, se outras mensagens chegarem para serem processadas, elas ficarão armazenadas em uma fila, e é o sistema de atores que controla isto, até que haja um ator disponível para processar aquela mensagem. 

Existem duas formas de se mandar uma mensagem para um ator, o tell e o ask. Ao chamar o método tell, devemos possuir uma lógica de chame e esqueça, o ator deverá conter toda a lógica, responsável por processar a mensagem, e nenhuma resposta será esperada deste método. Já ao chamar o método ask, a resposta que o ator devolve, é uma classe chamada Future, esta classe não contem em si o resultado, mas sim um link para quando ele estiver disponível. 

Ao chamarmos o método ask estamos usando um método assíncrono, por isso, o código continua a rodar, e quando houver uma resposta, ou seja, quando o ator tiver processado a mensagem, podemos acessá-la através de alguns métodos como o map, o foreach, o onSucess, etc.

A única chamada que fazemos ao nosso ator é do tipo tell, já que só devemos enviar os tweets recebidos, para que o ator repasse ele para a interface UI. Esta chamada é feita dentro do método onStatus no StatusListener. Para criar o sistema de atores e o nosso ator, usamos o seguinte código:

private static ActorSystem system = ActorSystem.create("mixedTweets");
    private static Materializer mat = ActorMaterializer.create(system);
    static final Pair<ActorRef, Publisher<Object>> ti =
            Source.actorRef(100, OverflowStrategy.fail()).
                    toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
                            Keep.both()).run(mat);
    static ActorRef socketActor = ti.first();
    static Publisher<Object> publisher = ti.second();

A criação do ActorSystem é bem simples, basta chamar o método create e dar um nome ao sistema.

Repare que todos os parametros daqui são static, isto é feito para evitar que tenhamos mais de uma instância destes parametros rodando na jvm, já que podemos ter várias instâncias de um controller rodando em um sistema. Caso haja mais de um sistema de atores, vai ser impossível garantir quem enviou e quem recebeu as mensagens.

O Materializer é uma fábrica que faz com que os streams do Akka rodem, e ele é utilizado ao final da linha que cria o Ator e o publisher. Para criar esta dupla, usa-se um Source, que é a fonte. Para criar ela, dizemos que a fila máxima de mensagens na fila é 100 e que o sistema irá falhar, caso este número exceda 100 (overflowstrategy.fail()). 

Depois esta fonte é convertida em um Materializer, e nesta conversão fazemos uma sincronização com o Publisher, que é o objeto que irá publicar os dados das mensagens recebidas pelo ator. Finalmente chama-se o método run, que vai rodar o sistema pelo Materializer.

Repare que o Objeto retornado é um ActorRef, que é a instância do nosso ator, e um Publisher<Object>, é por isto que estamos fazendo o cast da mensagem para Objeto, para que o sistema entenda que a mensagem é um Object, e ela será publicada em um publisher de Objeto. Se não fizer esta conversão, o sistema não funciona devido aos tipos diferentes.

O que fizemos aqui, foi dizer que as mensagens recebidas pelo ator, serão publicadas para a nossa interface, mas antes disto devemos ter uma resposta do método que le os streams para a interface, usando o publisher.

Controller

O método que le os streams, e retorna os valores para a interface é:
public play.mvc.Result stream(String queries) {
        Source<Object, ?> source = Source.fromPublisher(publisher);
        final Source<ByteString, ?> eventSource = source.map(ev ->
        {
            return ((ByteString) ev);
        });
        list.listenAndStream(queries, socketActor);
        list.twitterStream.addListener(list.statusListener);

        return ok().chunked(eventSource);
    }

O método listenAndStream, retorna um Source do publisher e este Source (fonte) será utilizado para repassar os eventos para a interface.

Ao criar o objeto eventSource, estamos convertendo as mensagens recebidas em eventos, eve é uma String. Isto está sendo feito utilizando o lambda do Java 8, que é uma forma de programação funcional e já foi explicada aqui.

O último passo deste método é retornar uma resposta ok, chunked, que nada mais é do que em partes, já que o fluxo do stream é contínuo, devemos manter aberta a conexão entre o browser e o controller, para que seja possível continuar a enviar os dados para o usuário. Esta resposta chunked vai ser realizada através de um Fluxo de Eventos EventSource.flow() e ela vai retornar uma mensagem Http.