Pesquisar este blog

segunda-feira, 28 de setembro de 2020

Criando um cluster de Apache Spark com o Docker Compose

Este post segue uma sequência de explicações a respeito dos containers, se você não sabe o que são containers, comece por aqui, onde são explicados como eles funcionam e de onde eles surgiram. 

Temos também, neste post, um exemplo de como instalar um banco de dados usando o Docker.

Aqui será mostrado como criar um cluster com o Docker, algo que poderá ser feito em uma máquina comum, na sua casa, se você não sabe o que é um cluster, veja aqui a explicação.

Introdução

Antes da versão 1.12, para criar uma cluster com o Docker, era necessário utilizar um serviço de armazenamento de chaves externo, mas isso mudou  com o aparecimento do Swarm. O Swarm é uma forma nativa de configurar um cluster com containers do Docker, com ele você pode tanto criar clusters em sua máquina local, como em plataformas de cloud.

Além do swarm, existem outras formas de criarmos clusters com o docker, dentre elas podemos citar o Apache Mesos, OpenShift e o Kubernetes.

A criação de clusters com containers permite a criação de diversos tipos de ambientes, incluindo banco de dados poderosos, espalhados em muitos containers, um serviço web que escalona rapidamente, conforme sua necessidade. Os containers possuem uma versatilidade muito interessante, sendo possível até, criar um cluster usando dispositivos RaspberryPi.

Mas o que é o Spark?

Histórico:

O Apache Spark é um cluster de processamento de dados em paralelo, tudo começou com o algoritmo de Map (mapeamento)/Reduce (computação dos resultados) desenvolvido para processar os dados de indexação das páginas HTML que seriam mostradas no Google, e com isso nasceu o famoso termo BigData, que remete ao processamento de um grande volume de dados. Conforme as empresas vão evoluindo seus produtos, mais e mais é necessário produzir relatórios mais complexos, os quais necessitam ligações e cruzamento dos dados e com um volume cada vez maior. 

A concorrência, pelo acesso aos dados no banco RDBMS fica cada vez maior e com isso, conforme demonstrado na explicação sobre travas em banco de dados RDBMS o acesso aos dados fica cada vez mais lento, e outros sistemas precisaram ser desenvolvidos para dar conta destes processos.

O primeiro sistema que apareceu foi o Apache Hadoop, que rapidamente ganhou populadidade, por ser um dos primeiros sistemas de cluster a permitir o processamento de dados em paralelo, de maneira open source. Ao redor dele, foram criados diversos softwares que permitiram acessar queries SQL em paralelo, como o Hive, um sistema de arquivos, o HDFS, um sistema de algoritmos de Machine Learning, como o Mahout

Todos estes sistemas criaram um ecosistema ao redor do Hadoop, e isso fez com que ele fosse usado em muitos lugares, mas uma das suas características era ler e escrever os dados no sistema de arquivos, toda vez que eles seriam processados e com o tempo, isso acabou ficando custoso.

O Spark, surgiu como uma ideia de acelerar este processamento de dados, e para isso ele, ao invés de escrever os dados no hd, resolveram fazer com que ele armazenasse os dados na memória. Isso fez com que o Spark fosse até 100 vezes mais rápido que o Hadoop, e isso atraiu muita atenção para este projeto. Em 2015, ele era o projeto opensource que mais se desenvolvia no mundo.

Ele foi escrito utilizando a linguagem Scala, que roda dentro da JVM, com isso ele já teria compatibilidade tanto com o Java, quanto com o próprio Scala. Dentro do Spark, existe uma implementação de DSL do SQL, chamado SparkSQL, que permite acessar os dados dos seus Dataframes usando SQL. Depois foram adicionados processadores de Python e R. o que trouxe uma flexibilidade muito boa para este projeto, pois ele abrange linguagens bem populares. O Java, o Python, o Scala e o R, que apesar de não ser muito conhecido pela maioria dos técnicos ela é bem conhecida na área de estatística. Atualmente estão sendo desenvolvidos implementações para o Go e o Kotlin, mostrando que a evolução da plataforma continua.

Esta flexibilidade também torna possível escrever programas em diversas linguagens, aproveitando o ponto forte e as bibliotecas de cada uma delas. R é muito conhecido por ter muita coisa de estatística implementada como biblioteca, assim caso, o trabalho a ser desenvolvido, precise de uma biblioteca bem específica, pode ser mais fácil aproveitar algo que já está implementado no R, do que desenvolver uma lib do zero.

O Spark é subdividido em Core, que possui as funcionalidades básicas, o SparkSQL, que implementa o SQL ansi nesta plataforma, a MLLib, que é a biblioteca de Machine Learning, o Stream, que é uma implementação de leitura de dados em Streaming.

Arquitetura:

O Spark contem, duas partes principais, o Driver e os executores e cada um deles possui um papel diferente nesta arquitetura. 

Ao executar o Spark, cria-se um contexto de execução e este contexto fica alocado no Driver. O Driver é a máquina que recebe os comandos e os resultados das computações realizadas no Spark. Cada comando recebido será distribuído para serem executados pelos executores.

Toda vez que algum dado é carregado no Spark, os dados são distribuídos entre os executores. Todos os dados são armazenados em três tipos de distribuição, os Datasets, os Dataframes e os RDDs e cada uma delas possui uma abstração diferente.

Os RDDs são datasets resilientes e distribuídos, mas o que significa isso? Imagine que você carregue uma coleção com 5.000 linhas, ao fazer isso, estes dados serão distribuídos em partições, e cada uma destas partições conterá um pedaço dos dados, por exemplo, se o luster tiver 100 partições, idealmente cada partição conterá 500 linhas daquela coleção. Nem sempre é isso que acontece, mas quanto melhor distribuídos forem os dados, mais rápido eles serão processados, pois cada uma destas partições irá executar as operaçõe em um Executor diferente.

Características 

Todas as operações são realizadas de forma lazy, desta forma, ele consegue otimizar bastante a execução das operações. Imagine que você pegue aquela mesma coleção de 5000 linhas, do exemplo anterior, e executa 4 filtros diferentes. Mesmo que você coloque estas operações em execuções diferentes e imagine que ele terá que percorrer a coleção 4 vezes, para executar os 4 filtros, não é isso que acontece, ele irá concentrar todas as operações e executá-las todas de uma vez, somente quando uma operação final for realizada. Operações finais são aquelas que exigem um retorno dos dados, ou uma extração de dados como resultado. Operações de count, collect, write e take são exemplos de destes tipos de operaçoes.

Claro que este foi um exemplo simples de otimização, mas estas otimizações acontecem em diversos níveis diferentes e, as vezes, até é possível eliminar algumas operações redundantes.

Performance:

Performance é uma das grandes do Spark, quando ele surgiu a propaganda era que ele rodava até 100x mais rápido que o Hadoop, sistema usado em plataformas de bigdata na época. Isso é fruto destas otimizações e também pois, com ele, é possível guardar snapshots (fotografia do estado das coleções) na memória do cluster, e isso faz com que o acesso aos dados seja extremamente rápido.

Os dados de cada partição ficam replicados em pelo menos três executores diferentes, para que caso um deles trave, ou tenha algum problema, o dado não seja perdido.

Existem uma série de outros fatores, que influenciaram na performance do Spark, e isso evolui a cada dia, lá em 2013 o Spark era o projeto opensource que mais rápido evoluia no mundo, isso trouxe muitas novidades.

Suporte a linguagens:

Spark foi desenvolvido em Scala, e por isso esta, assim como o Java, eram as linguagens suportadas na sua primeira versão. Depois vieram as implementações de SQL, que é uma implementação própria do Spark, do Python, do R (linguagem estatística, estas duas últimas surgiram pois haviam muitas bibliotecas de Machine Learning disponíveis nelas, .Net ganhou suporte no ano passado e o suporte ao Kotlin está atualmente em desenvolvimento.

Flexibilidade:

Além do suporte a múltiplas linguagens, uma outra característica do Spark é rodar em múltiplos ambientes.

Desde a sua versão inicial o Spark usa como sistema de arquivos o HDFS, que é padrão nos sistemas Hadoop, e ele sempre foi compatível com as ferramentas da stack do Hadoop, isso abriu muitas portas, para fazer com que o Spark fosse escolhido como engine de execução em sistemas de bigdata. Hoje ele é suportado por todas as clouds, além do Mesos, Yarn e Kubernetes, que são administradores de recursos.

Caso de uso:

Neste post faremos um exemplo de um cluster spark, contendo um notebook, um driver e 2 executores, para isso vamos usar a composição de imagens Docker, começando pela criação de uma imagem básica, e o primeiro passo será criar uma imagem básica para o cluster, ela foi criada a partir do github do próprio Spark, para que assim, garantíssemos que tanto o sistema operacional, quanto a instalação do Java, estavam na versão correta, para fazer  isso criarmos o seguinte conteúdo num arquivo cluster-base:

ARG debian_buster_image_tag=8-jre-slim
FROM openjdk:${debian_buster_image_tag}

# -- Layer: OS + Python 3.7

ARG shared_workspace=/opt/workspace

RUN mkdir -p ${shared_workspace} && \
    apt-get update -y && \
    apt-get install -y python3 && \
    ln -s /usr/bin/python3 /usr/bin/python && \
    rm -rf /var/lib/apt/lists/*

ENV SHARED_WORKSPACE=${shared_workspace}

# -- Runtime

VOLUME ${shared_workspace}
CMD ["bash"]

O próximo passo será definir uma imagem base para o spark, nela iremos definir tanto a versão do spark, como a versão do python, assim como outras variáveis de ambiente, que são usadas na execução. Uma nota é que aqui o Driver é chamado de master, nomenclatura que tem sido abolida devido ao peso de preconceito da época da escravidão.

FROM cluster-base

# -- Layer: Apache Spark

ARG spark_version=3.0.1
ARG hadoop_version=2.7

RUN apt-get update -y && \
    apt-get install -y curl && \
    curl https://archive.apache.org/dist/spark/spark-${spark_version}/spark-${spark_version}-bin-hadoop${hadoop_version}.tgz -o spark.tgz && \
    tar -xf spark.tgz && \
    mv spark-${spark_version}-bin-hadoop${hadoop_version} /usr/bin/ && \
    mkdir /usr/bin/spark-${spark_version}-bin-hadoop${hadoop_version}/logs && \
    rm spark.tgz

ENV SPARK_HOME /usr/bin/spark-${spark_version}-bin-hadoop${hadoop_version}
ENV SPARK_MASTER_HOST spark-master
ENV SPARK_MASTER_PORT 7077
ENV PYSPARK_PYTHON python3

# -- Runtime

WORKDIR ${SPARK_HOME}

  • SPARK_HOME é uma variável de ambiente que define o diretório onde o Spark estará instalado
  • SPARK_MASTER_HOST é o local onde estará instalado o Driver do Spark.
  • SPARK_MASTER_PORT é a porta onde o Diver irá rodar
  • PYSPARK_PYTHON é a versão do python que será usada dentro o Spark.

Repare que a versão usada pelo Spark, é diferente da versão usada na imagem base, mas elas não tem relação entre si, esta é a versão que será usada dentro do Spark.

Nos próximos passos serão criadas as imagens do Driver e dos Executores do spark, as quais serão colocadas, respectivamente, nos seguintes arquivos spark-master e spark-executor, sendo o arquivo do driver:

FROM spark-base

# -- Runtime

ARG spark_master_web_ui=8080

EXPOSE ${spark_master_web_ui} ${SPARK_MASTER_PORT}
CMD bin/spark-class org.apache.spark.deploy.master.Master >> logs/spark-master.out

E o arquivo do executor:

FROM spark-base

# -- Runtime

ARG spark_worker_web_ui=8081

EXPOSE ${spark_worker_web_ui}
CMD bin/spark-class org.apache.spark.deploy.worker.Worker spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} >> logs/spark-worker.out

Perceba que as configurações realizadas aqui são mínimas, web-ui é uma interface que o spark prove para debugarmos a execução dos seus jobs, e nesta configuração setamos a porta 8081 para cada uma delas.

Tirando isso a outra configuração é simplesmente o software que será executado, além da localização do log de saída da sua execução. 

Agora a última imagem que está faltando é a imagem do notebook, nela será instalado o jupyter e configurada a sua execução:

FROM cluster-base

# -- Layer: JupyterLab

ARG spark_version=3.0.1
ARG jupyterlab_version=2.1.5

RUN apt-get update -y && \
    apt-get install -y python3-pip && \
    pip3 install pyspark==${spark_version} jupyterlab==${jupyterlab_version} wget

# -- Runtime

EXPOSE 8888
WORKDIR ${SHARED_WORKSPACE}
CMD jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root --NotebookApp.token=

Repare que as versões tanto do spark, quanto do jupyter estão como argumentos desta imagem, pois ela precisa instalar o pyspark e o jupyter.

Como último passo, falta colocar esta estrutura pra rodar, e aqui usaremos o docker-compose, que foi feito para criar clusters dentro de uma mesma rede, ou máquina.

version: "3.6"
volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
services:
  jupyterlab:
    image: spark-jupyter
    container_name: spark-jupyter
    ports:
      - 8888:8888
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: spark-master
    container_name: spark-master
    ports:
      - 8080:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: spark-executor
    container_name: spark-executor-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8081:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: spark-executor
    container_name: spark-executor-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

Este conteúdo vai dentro de um arquivos chamado docker-compose.yml, que será usado para executar todas as imagens, mas antes disso, precisamos construir cada uma das imagens. Veja que cada executor vai usar apenas um core e 512m de memória, esta é uma configuração bem básica, mas ela pode ser alterada caso você tenha uma máquina poderosa.

Construção das imagens:

A construção das imagens é feita através de um comando de build do docker e como as imagens possuem dependência entre si, deveremos executar o build nesta sequencia:

SPARK_VERSION="3.0.1
HADOOP_VERSION="2.7"
JUPYTERLAB_VERSION="2.1.5"

docker build -f cluster-base -t cluster-base .

docker build --build-arg spark_version="${SPARK_VERSION}" --build-arg hadoop_version="${HADOOP_VERSION}" -f spark-base -t spark-base .

docker build -f spark-master -t spark-master .

docker build -f spark-worker -t spark-worker .

docker build --build-arg spark_version="${SPARK_VERSION}" --build-arg jupyterlab_version="${JUPYTERLAB_VERSION}" -f spark-jupyter -t spark-jupyter .

Repare que as versões usadas aqui neste cluster, são todas definidas em variáveis de ambiente nos primeiros comandos e elas são usadas para construir a imagem base do spark. Assim caso você esteja aqui no futuro e a versão mais recente do spark seja outra, você só precisa mudar este comando que suas imagens estarão atualizadas. Para saber qual é a última versão do Spark acesse a página deles.

Execução:

Assim que construídas, chegou a hora de executarmos as imagens e para fazer isso é só executar o comando:

docker-compose up

Se ele executar com sucesso, algumas mensagens de log serão impressas no seu console, sendo que as últimas serão:

spark-executor-2  | 20/09/28 19:04:10 INFO Worker: Successfully registered with master spark://spark-master:7077

Isso será mostrado para cada uma das instâncias do nosso cluster, mas para verificar se todas elas subiram corretamente, vamos acessá-las usando os seguintes endereços:

  • JupyterLab at localhost:8888;
  • Spark master at localhost:8080;
  • Spark worker I at localhost:8081;
  • Spark worker II at localhost:8082;
O jupyterlab é onde executaremos os comandos no cluster, os endereços do Driver e dos executores mostrarão as interfaces de debug do Spark.

 Mão na massa:

Agora chegou a parte mais legal do artigo, que é quando vamos para a parte prática, para acessar o cluster vamos criar um notebook no Jupyter e executar alguns comandos. Para acessar um cluster Spark, precisamos criar um contexto do Spark e isso será feito a partir dos seguintes comandos:

 


 No primeiro comando é onde criamos a conexão com o Spark, para isso dois fatores serão definidos, o nome do applicativo (exemplo-pyspark) e o endereço do master, porta 7077. Repare que as setas vermelhas apontam para um arquivo, que foi baixado, através do comando wget. Este arquivo que foi usado como exemplo de coluna de dados.

Veja que aqui ainda não fizemos nada de muito interessante, mas muita coisa pode ser feita com este servidor para processar dados em paralelo.

Apresentação

 

Nenhum comentário:

Postar um comentário