Esta postagem é uma atualização do que foi explicado neste post.
O Twitter é uma rede social que provê um canal de comunicação, por onde podemos nos conectar para acessar os tweets, através de stream. Existem várias aplicações interessantes para esta plataforma, muitas delas envolvendo o uso de inteligência artificial.
Todo o código usado para explicar este tópico está no seguinte repositório.
Para acessar o stream do twitter, vamos utilizar uma lib chamada twitter4j-stream, esta api que irá converter as chamadas do java para a api REST do twitter, e vice e versa. Para adicioná-la ao nosso projeto, basta colocar ela como dependência do build.sbt:
Uma outra dependência deste projeto será o playframework que atualmente está na versão 2.5.9."org.twitter4j" % "twitter4j-stream" % "4.0.2"
# Home page
GET / @controllers.ApplicationController.index
GET /stream @controllers.ApplicationController.stream(query: String)
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.at(path="/public", file)
Acessando o stream do twitter.
Estas configurações serão feitas usando os seguintes parâmetros:
Config é onde iremos colocar as configurações, o configBuilder é quem irá criar as configurações e o twitterStream é o objeto que se conecta ao twitter e traz os tweets.private Configuration config;
private ConfigurationBuilder configBuilder = new ConfigurationBuilder();
public StatusListener statusListener;
public TwitterStream twitterStream;
FilterQuery query;
A configuração será feita, desta maneira:
configBuilder.setOAuthConsumerKey("consumer_key");configBuilder.setOAuthConsumerSecret("consumer_secret");configBuilder.setOAuthAccessToken("access_token");configBuilder.setOAuthAccessTokenSecret("token_secret");config = configBuilder.build();
twitterStream = new TwitterStreamFactory(config).getInstance();
Depois é só adicionar o objeto de query ao twitterStream:FilterQuery query = new FilterQuery().track(new String[]{searchQuery});
twitterStream.filter(query);
public void onStatus(Status status) { Logger.info(status.getUser().getName()); StringBuilder mensagem = new StringBuilder(); try { mensagem.append(status.getUser().getName()); mensagem.append(":"); mensagem.append(new String(status.getText().getBytes(), "utf-8")); mensagem.append("\n"); } catch (Exception e) { e.printStackTrace(); } socketActor.tell(ByteString.fromString(mensagem.toString()), ActorRef.noSender()); }
twitterStream.addListener(statusListener);
Akka e os atores
No Akka, existe uma classe chamada ActorSystem, e é esta classe que controla quais e quantas instâncias de atores estão disponíveis para serem utilizados. Quando um ator recebe uma mensagem, ele irá alocar uma instância de um ator daquele tipo, para processar aquela mensagem. Enquanto isto, se outras mensagens chegarem para serem processadas, elas ficarão armazenadas em uma fila, e é o sistema de atores que controla isto, até que haja um ator disponível para processar aquela mensagem.
Existem duas formas de se mandar uma mensagem para um ator, o tell e o ask. Ao chamar o método tell, devemos possuir uma lógica de chame e esqueça, o ator deverá conter toda a lógica, responsável por processar a mensagem, e nenhuma resposta será esperada deste método. Já ao chamar o método ask, a resposta que o ator devolve, é uma classe chamada Future, esta classe não contem em si o resultado, mas sim um link para quando ele estiver disponível.
Ao chamarmos o método ask estamos usando um método assíncrono, por isso, o código continua a rodar, e quando houver uma resposta, ou seja, quando o ator tiver processado a mensagem, podemos acessá-la através de alguns métodos como o map, o foreach, o onSucess, etc.
A única chamada que fazemos ao nosso ator é do tipo tell, já que só devemos enviar os tweets recebidos, para que o ator repasse ele para a interface UI. Esta chamada é feita dentro do método onStatus no StatusListener. Para criar o sistema de atores e o nosso ator, usamos o seguinte código:
private static ActorSystem system = ActorSystem.create("mixedTweets");
private static Materializer mat = ActorMaterializer.create(system);
static final Pair<ActorRef, Publisher<Object>> ti =
Source.actorRef(100, OverflowStrategy.fail()).
toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
Keep.both()).run(mat);
static ActorRef socketActor = ti.first();
static Publisher<Object> publisher = ti.second();
A criação do ActorSystem é bem simples, basta chamar o método create e dar um nome ao sistema.
Repare que todos os parametros daqui são static, isto é feito para evitar que tenhamos mais de uma instância destes parametros rodando na jvm, já que podemos ter várias instâncias de um controller rodando em um sistema. Caso haja mais de um sistema de atores, vai ser impossível garantir quem enviou e quem recebeu as mensagens.
O Materializer é uma fábrica que faz com que os streams do Akka rodem, e ele é utilizado ao final da linha que cria o Ator e o publisher. Para criar esta dupla, usa-se um Source, que é a fonte. Para criar ela, dizemos que a fila máxima de mensagens na fila é 100 e que o sistema irá falhar, caso este número exceda 100 (overflowstrategy.fail()).
Depois esta fonte é convertida em um Materializer, e nesta conversão fazemos uma sincronização com o Publisher, que é o objeto que irá publicar os dados das mensagens recebidas pelo ator. Finalmente chama-se o método run, que vai rodar o sistema pelo Materializer.
Repare que o Objeto retornado é um ActorRef, que é a instância do nosso ator, e um Publisher<Object>, é por isto que estamos fazendo o cast da mensagem para Objeto, para que o sistema entenda que a mensagem é um Object, e ela será publicada em um publisher de Objeto. Se não fizer esta conversão, o sistema não funciona devido aos tipos diferentes.
Controller
O método que le os streams, e retorna os valores para a interface é:
public play.mvc.Result stream(String queries) {
Source<Object, ?> source = Source.fromPublisher(publisher);
final Source<ByteString, ?> eventSource = source.map(ev ->
{
return ((ByteString) ev);
});
list.listenAndStream(queries, socketActor);
list.twitterStream.addListener(list.statusListener);
return ok().chunked(eventSource);
}
O método listenAndStream, retorna um Source do publisher e este Source (fonte) será utilizado para repassar os eventos para a interface.
Ao criar o objeto eventSource, estamos convertendo as mensagens recebidas em eventos, eve é uma String. Isto está sendo feito utilizando o lambda do Java 8, que é uma forma de programação funcional e já foi explicada aqui.
O último passo deste método é retornar uma resposta ok, chunked, que nada mais é do que em partes, já que o fluxo do stream é contínuo, devemos manter aberta a conexão entre o browser e o controller, para que seja possível continuar a enviar os dados para o usuário. Esta resposta chunked vai ser realizada através de um Fluxo de Eventos EventSource.flow() e ela vai retornar uma mensagem Http.
Nenhum comentário:
Postar um comentário