🇧🇷 Highlights Spark the definitive guide
🇬🇧 To read this article in English click here
Antes de começar, gostaria de deixar claro que esses highlights são baseados em minhas anotações sobre o que li e entendi do livro. Não utilizo nenhum exemplo presente na obra e escrevo com base em minhas próprias palavras e pensamentos extraídos dela (não me processem).
Dito isso, vamos falar sobre o que considerei mais interessante e importante nesse livro. Algumas das coisas que mencionarei aqui podem parecer básicas para quem já tem algum conhecimento sobre Spark — e, de fato, são. Por isso, classifiquei os assuntos como básico(🟢), mediano(🟡) e avançado(🔴), para que você possa se orientar melhor.
Bora lá.
Estruturas de Dados no Spark (🟢)
O livro apresenta um pouco sobre estrutura de dados no spark, sendo elas RDDs, DataFrames e Datasets, aqui vai um resumo nem tão breve sobre elas.
RDDs (Resilient Distributed Datasets):
Os RDDs (Resilient Distributed Datasets) são a base do Apache Spark para lidar com dados de forma distribuída. Basicamente, eles dividem os dados em várias partes e espalham pelos nós do cluster, permitindo que o processamento aconteça de forma paralela e eficiente.
O que faz os RDDs especiais?
Imutáveis → Depois de criado, um RDD não muda. Se precisar modificar, o Spark gera um novo.
Distribuídos → Os dados são automaticamente espalhados pelo cluster.
Tolerantes a falhas → Se um nó cair, o Spark consegue recriar os dados usando o histórico das operações.
Lazy Evaluation → O Spark não executa as operações imediatamente, só quando é realmente necessário (tipo quando chamamos um collect() ou count()).
Como trabalhar com RDDs?
Transformações → Criam novos RDDs (ex: map(), filter(), flatMap(), reduceByKey()).
Ações → Executam as operações e retornam um resultado (ex: collect(), count(), take()).
Os RDDs são poderosos, mas hoje em dia o uso de DataFrames e Datasets é mais comum, já que eles aproveitam melhor as otimizações do Spark. Mesmo assim, entender RDDs ajuda a ter mais controle sobre o processamento (e isso pode cair em uma entrevista de emprego, então preste atenção).
DataFrames
Os DataFrames são a forma mais eficiente de trabalhar com dados no Spark. Eles funcionam como tabelas SQL, organizando os dados em colunas e linhas, mas com toda a performance e paralelismo do Spark por trás.
O que faz os DataFrames especiais?
Otimização automática → O Spark usa o Catalyst Optimizer(é o otimizador de consultas do Spark SQL, ele torna tudo mais eficiente antes de executar) para deixar as consultas mais rápidas.
Menos código, mais desempenho → Operações são mais simples e eficientes do que em RDDs.
Integração com SQL → Podemos rodar consultas SQL diretamente nos DataFrames (mais fácil que isso, impossível)
Suporte a diferentes formatos → Fácil leitura e escrita em JSON, Parquet, CSV, Avro, etc.
Como trabalhar com DataFrames?
Transformações → Como em SQL (select(), filter(), groupBy(), join()).
Ações → Executam e retornam resultados (show(), count()).
Os DataFrames são mais rápidos e otimizados que os RDDs, sendo a escolha ideal para a maioria dos casos. Se precisar de mais controle, ainda dá para converter um DataFrame para RDD quando necessário (mas raramente você vai precisar fazer isso).
Datasets
Os Datasets são uma mistura do melhor dos RDDs e DataFrames. Eles são fortemente tipados, o que significa mais segurança no código, e ainda aproveitam as otimizações automáticas do Spark. São ideais quando precisamos de mais controle sobre os dados.
O que faz os Datasets especiais?
Fortemente tipados → O Spark sabe exatamente o tipo de dado, evitando erros em tempo de execução.
Otimização inteligente → Usa o Catalyst Optimizer e o Tungsten para rodar mais rápido.
APIs flexíveis → Permitem tanto operações funcionais (como nos RDDs) quanto SQL-like (como nos DataFrames).
Interoperabilidade → Nome difícil para dizer que podem ser convertidos entre DataFrames, Datasets e RDDs.
Como trabalhar com Datasets?
Transformações → map(), filter(), groupBy(), join().
Ações → show(), count().
Os Datasets são uma ótima opção quando precisamos de mais segurança no código e boas otimizações.
No meio de tudo isso, os DataFrames são os mais utilizados.
Manipulação de Dados com DataFrames e SQL
Uso do Spark SQL para consultas relacionais(🟢, mas vale a pena dar uma olhada)
O livro nos mostra que o Spark permite fazer consultas relacionais usando uma sintaxe SQL familiar, mas isso a gente já sabe, o que eu gostaria de trazer aqui é, quando utilizar Spark SQL ou APIs do Spark, ou até mesmo entrar na “polêmica” discussão de qual o mais rápido.
Sobre quando utilizar cada uma delas. Se você está fazendo muitas operações de consulta e já é familiarizado com SQL, Spark SQL é uma ótima escolha. Se você precisa de controle e flexibilidade, ou está fazendo operações mais complexas, use as APIs do Spark.
No final, você também pode misturar os dois! Utilize o que for mais conveniente para o que você está tentando realizar.
Quando se trata de velocidade, o Spark SQL geralmente é mais rápido que as APIs do Spark, tipo DataFrames e RDDs, especialmente para consultas relacionais. Isso rola por causa do Catalyst Optimizer, que faz um ótimo trabalho de otimização. Aqui estão algumas razões para isso:
Otimização de Consultas:
O Catalyst dá uma olhada nas suas consultas SQL e faz ajustes para melhorar a execução, cortando tempo de resposta.
Execução em Pipeline:
O Spark SQL executa várias operações em um pipeline, o que significa que lê e grava dados de forma mais eficiente, ajudando em consultas complexas.
Uso Eficiente de Recursos:
O Spark SQL é feito para usar a memória e os recursos do cluster de forma inteligente, se ajustando para garantir que tudo rode liso e rápido.
Broadcast Joins:
Quando você precisa juntar tabelas menores, o Spark SQL pode usar joins de broadcast, enviando uma tabela pequena para todos os nós, acelerando a junção.
Resumão:
Spark SQL é geralmente mais rápido para consultas relacionais por conta das suas otimizações e melhor uso dos recursos.
As APIs do Spark podem se sair melhor em situações onde a lógica é mais complexa ou quando você está lidando com dados não relacionais.
No fim das contas, o desempenho pode variar dependendo dos dados e das operações que você está fazendo.
Otimizações internas do Catalyst Optimizer, Tungsten Execution Engine, gerenciamento de memória(🔴)
Na verdade, o livro menciona apenas o Catalyst Optimizer de forma mais básica. Eu quis expandir esse assunto e me aprofundar um pouco mais nas otimizações internas, proporcionando uma visão mais completa sobre o tema.
Catalyst Optimizer
Como já expliquei anteriormente, o Catalyst Optimizer é o responsável por otimizar as consultas no Spark SQL e nos DataFrames/Datasets. Ele aplica uma série de transformações para tornar a execução mais eficiente. O Catalyst opera em diferentes fases:
Fases do Catalyst Optimizer
Análise (Analysis)
O código SQL ou DataFrame é convertido em um Logical Plan baseado na estrutura dos dados (Schema).
Verifica erros como colunas inexistentes ou tipos de dados incompatíveis.
Otimização
Aplica Regras de Reescrita para melhorar a consulta. Exemplos:
Predicate Pushdown: Filtragens (WHERE) são movidas para mais cedo na execução, reduzindo a quantidade de dados processados.
Constant Folding: Expressões constantes são resolvidas antes da execução para evitar cálculos desnecessários.
Geração do Plano Físico (ou Physical Planning, se preferir)
O plano lógico é convertido em um Plano Físico, onde estratégias de execução são escolhidas.
O Spark escolhe entre Broadcast Join, Sort Merge Join, Shuffle Hash Join, dependendo do tamanho dos dados.
O código final é gerado usando bytecode otimizado em Java para melhorar a execução.
Não entendeu muito bem?
Exemplo de Otimização do Catalyst
Vamos ver um exemplo simples de Predicate Pushdown e Constant Folding:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CatalystOptimization").getOrCreate()
df = spark.read.parquet("employees.parquet")
df_filtered = df.filter("salary > 50000 AND department = 'Engineering' AND 1 = 1")\
.select("name", "salary", "department")
df_filtered.explain(True)
Predicate Pushdown
O filtro salary > 50000 e department = ‘Engineering’ será aplicado antes da leitura dos dados, reduzindo a quantidade de informações carregadas para memória.
Isso funciona especialmente bem com formatos colunares como Parquet e ORC.
Constant Folding
O Catalyst detecta que 1 = 1 não impacta na consulta e simplesmente remove essa parte.
Pruning de Colunas (Column Pruning)
Como estamos selecionando apenas name, salary e department, o Spark não carrega as outras colunas, reduzindo uso de memória e processamento.
Entendeu melhor agora?
Tungsten Execution Engine
O Tungsten Execution Engine melhora o desempenho reduzindo a sobrecarga do interpretador Java e otimizando o uso de CPU e memória.
Principais Melhorias do Tungsten
Whole-Stage Code Generation (WSCG):
O Spark gera código nativo otimizado para execução, reduzindo o overhead de interpretação.
Pipeline Execution:
Operações são encadeadas para reduzir a necessidade de escrever/intermediar dados no disco.
Vectorized Query Execution (Execução Vetorizada):
Em vez de processar linha por linha, o Spark processa múltiplas linhas ao mesmo tempo em batches, reduzindo chamadas de função.
Exemplo de Execução Vetorizada
df = spark.read.parquet(“large_dataset.parquet”)
df.select(“id”, “salary”).summary().show()
Se os dados estiverem em Parquet, o Spark usará execução vetorizada para reduzir o custo computacional.
Habilitando Execução Vetorizada Manualmente
Se não estiver ativada por padrão:
spark.conf.set(“spark.sql.parquet.enableVectorizedReader”, “true”)
Gerenciamento de Memória no Spark
O Spark usa um modelo híbrido de memória e disco, garantindo que o processamento continue mesmo quando a memória RAM não é suficiente.
Componentes da Memória do Spark
A memória no Spark é dividida em Executor Memory:
Storage Memory:
Armazena dados em cache de RDDs e DataFrames.
Execution Memory:
Usada para operações como Sort, Aggregations e Joins.
Se a Execution Memory precisar de mais espaço, ela pode tomar parte da Storage Memory e vice-versa.
Configurando Memória no Spark
Podemos ajustar a memória disponível para os executores:
spark.conf.set("spark.executor.memory", "4g") #define 4GB para cada executor
spark.conf.set("spark.memory.fraction", "0.6") #60% da memória será para execução
Fácil?
Spill to Disk (Fallback)
Se a memória acabar, o Spark escreve dados temporários no disco:
Isso é ruim para desempenho, então devemos evitar aumentando executor.memory.
Dica!!!!!!!!: Sempre use .explain(True) para entender como o Spark está otimizando suas consultas!
Leitura e Escrita de Dados(🟢)
Read
Agora, vamos sair de um assunto que pode ser bem complicado de entender para um bem simples.
O livro mostra conectores para diferentes fontes: CSV, JSON, Parquet, Avro, ORC
vou resumir tudo em um único exemplo de código para você.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataConnectors").getOrCreate()
#lendo CSV
df_csv = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
df_csv.show()
#lendo JSON
df_json = spark.read.option("multiline", "true").json("data.json")
df_json.show()
#lendo Parquet
df_parquet = spark.read.parquet("data.parquet")
df_parquet.show()
#lendo Avro
df_avro = spark.read.format("avro").load("data.avro")
df_avro.show()
#lendo ORC
df_orc = spark.read.orc("data.orc")
df_orc.show()
Integração com bancos de dados como MySQL, PostgreSQL e NoSQL (Cassandra, MongoDB)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DatabaseConnectors").getOrCreate()
#conexão com MySQL
df_mysql = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/db_name") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
df_mysql.show()
#conexão com PostgreSQL
df_postgres = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/db_name") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
df_postgres.show()
#conexão com Cassandra
df_cassandra = spark.read \
.format("org.apache.spark.sql.cassandra") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.load()
df_cassandra.show()
#conexão com MongoDB
df_mongo = spark.read \
.format("mongo") \
.option("uri", "mongodb://localhost:27017/db_name.collection_name") \
.load()
df_mongo.show()
Uso do Delta Lake para versionamento e gerenciamento de tabelas transacionais(🟡)
Vamos voltar a subir um pouco o nível da conversa. O livro nos apresenta um pouco sobre o uso de Delta Lake (pouco até demais), vou trazer algumas coisas apresentadas e tentar agregar um pouco mais.
Por favor, não confunda Delta Lake com Data Lake!
O Delta Lake é tipo um upgrade para o Spark que deixa seu data lake mais seguro, rápido e confiável. Ele usa o formato Parquet, mas com controle de versão, suporte a transações ACID e melhorias na performance.
O que faz o Delta Lake ser tão bom?
Transações ACID – Evita corrupção de dados e garante que as mudanças sejam aplicadas corretamente.
Versionamento (Time Travel) – Permite acessar versões antigas da tabela.
Evolução de Schema – Aceita mudanças na estrutura da tabela sem dar erro.
Merge, Upsert e Delete otimizados – Atualizar ou apagar dados fica muito mais fácil.
Vamos ver alguns exemplos:
Exemplo no PySpark
Criando uma Tabela Delta
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
data = [(1, "Alice", 5000), (2, "Bob", 6000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])
df.write.format("delta").mode("overwrite").save("/tmp/delta_table")
Aqui, a SparkSession é configurada com as extensões do Delta Lake, permitindo que o Spark entenda e trabalhe com esse formato de dados. Perceba que utilizo o formato “delta” isso já faz com que sua tabela delta seja salva no formato parquet.
Com a tabela Delta, é possível “voltar no tempo” e navegar por suas modificações permitindo recuperar versões. Eu jã escrevi um artigo bem completo explicando em detalhes como funciona o TimeTravel do Delta Lake, caso queira se aprofundar:
https://gustavosantanadata.com.br/como-voltar-no-tempo-e-acessar-versoes-historicas-de-suas-tabelas/
Mas aqui vai um exemplo rápido:
df_old_version = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta_table")
df_old_version.show()
Aqui estamos lendo os dados salvos anteriormente no Delta Lake, mas com um detalhe:
.option(“versionAsOf”, 0) → Isso diz para o Spark carregar a versão 0 da tabela, ou seja, a primeira versão criada.
O Delta Lake mantém um histórico de mudanças na tabela. Cada vez que os dados são sobrescritos ou atualizados, ele cria uma nova versão (exemplo: versão 0, versão 1, versão 2…).
Fazendo Upsert com MERGE
from delta.tables import DeltaTable
updates = spark.createDataFrame([
(1, "Alice", 7000), #atualizando salário
(3, "Charlie", 8000) #novo registro
], ["id", "name", "salary"])
delta_table = DeltaTable.forPath(spark, "/tmp/delta_table")
delta_table.alias("old") \
.merge(updates.alias("new"), "old.id = new.id") \
.whenMatchedUpdate(set={"salary": "new.salary"}) \
.whenNotMatchedInsert(values={"id": "new.id", "name": "new.name", "salary": "new.salary"}) \
.execute()
Esse código faz um MERGE (upsert) em uma tabela Delta Lake, ou seja, atualiza registros existentes e insere novos registros de forma eficiente e transacional.
O Delta Lake verifica se os novos dados já existem na tabela e decide o que fazer:
Se o id já existe (whenMatchedUpdate) → Atualiza o salário.
Se o id ainda não existe (whenNotMatchedInsert) → Insere o novo registro.
Por que isso é mais eficiente?
Bom, primeiramente você vai parar de fazer gambiarra quando precisar realizar uma modificação na tabela(👀)
E também porque isso evita duplicação de dados, mantém histórico das mudanças (Delta Lake versiona tudo), mais eficiente que deletar e reescrever a tabela inteira, etc.
Processamento de Dados em Streaming(🟡)
No livro, a parte sobre processamento de dados em streaming é bem completa e traz vários exemplos de código interessantes. Essa é uma das partes que mais gostei.
Não vou compartilhar o trecho aqui, pois não quero ter problemas com direitos autorais, mas vou introduzir um pouco do que é apresentado no livro, com minha visão e conhecimento.
O Structured Streaming é a API do Spark para processamento contínuo de dados. Ele permite processar fluxos de dados quase em tempo real, usando a mesma estrutura de DataFrames e SQL que já conhecemos.
O Structured Streaming trabalha de forma declarativa, ou seja, você escreve o código como se estivesse lidando com um DataFrame estático, mas o Spark trata os dados como um fluxo contínuo.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
df_stream = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
df_stream.writeStream.format("console").start().awaitTermination()
Nesse exemplo, o Spark está lendo mensagens em tempo real de um socket (porta 9999) e imprimindo na tela.
O Structured Streaming pode operar de duas formas diferentes:
Micro-Batch (Padrão)
O fluxo de dados é processado em pequenos lotes (batches).
Cada batch tem um intervalo de tempo fixo (ex: processar novos dados a cada 1 segundo).
Boa performance e compatibilidade com a maioria das fontes de dados.
Continuous Processing (Baixa Latência)
Os dados são processados registro por registro, sem esperar formar um batch.
Ideal para baixa latência, mas tem menos compatibilidade com algumas operações.
df_stream.writeStream.format(“console”).trigger(continuous=”1 second”).start()
Com esse código, o processamento acontece de forma contínua, sem esperar formar batches.
O Spark pode consumir dados de várias fontes de streaming:
| 🔌 Fonte | Exemplo de Uso |
| Kafka | Logs de aplicações, eventos de usuário |
| Kinesis | Integração com AWS, IoT |
| Socket | Testes locais, mensagens de rede |
| Arquivos em tempo real | Logs de diretórios monitorados |
Exemplo: Lendo um fluxo de dados do Kafka
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "meu_topico") \
.load()
Aqui, o Spark está consumindo mensagens do tópico meu_topico no Kafka. Sempre que uma nova mensagem chega, o Spark a captura, processa e pode aplicar transformações em tempo real. Isso permite análises contínuas, como filtragem, agregação e até escrita dos resultados em outro sistema, como um banco de dados ou outro tópico do Kafka.
O Structured Streaming garante tolerância a falhas usando Checkpoints e Write-Ahead Logs(mecanismo de registro que garante durabilidade e tolerância a falhas no processamento de dados).
Checkpointing armazena o estado da aplicação, permitindo que ela continue do ponto onde parou caso haja falha.
df_stream.writeStream \
.format("parquet") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()
Se o Spark reiniciar, ele vai continuar do último estado salvo em /tmp/checkpoints.
Resumo do resumo do resumo
Structured Streaming → API declarativa para processar fluxos de dados como DataFrames.
Micro-Batch → Processa em pequenos lotes (padrão).
Continuous Processing → Processa dado a dado (latência menor).
Fontes de Dados → Kafka, Kinesis, sockets e arquivos em tempo real.
Checkpointing → Evita perda de dados e garante tolerância a falhas.
Machine Learning e Inteligência Artificial(🔴)
Diferentemente do tópico anterior, onde o livro é bastante completo e apresenta diversos exemplos, neste caso, o livro é bastante raso e foca em explicar superficialmente o MLlib e o K-means.
Entendo que, caso o autor se aprofundasse mais no tema, seria necessário um livro só para isso, já que é um assunto complexo que exige muita dedicação. Por isso, ao invés de detalhar excessivamente o conteúdo, vou adotar uma abordagem diferente: vou falar menos sobre o que é, mas mostrar mais como fazer, apresentando diversos exemplos de código.
Essa parte é bastante interessante, especialmente com o assunto tão em alta atualmente. O que vou apresentar é apenas uma porta de entrada, e recomendo que você se aprofunde mais sobre o tema fora daqui, principalmente se deseja se tornar um profissional requisitado no futuro.
Machine Learning (ML) no Apache Spark
O Machine Learning (ML) no Apache Spark é bem simples de usar com a biblioteca MLlib. Ela tem várias ferramentas para trabalhar com modelos de aprendizado de máquina em grandes volumes de dados.
Introdução ao MLlib
O MLlib é a biblioteca de aprendizado de máquina do Spark, feita para lidar com dados grandes e distribuídos. Ele oferece algoritmos eficientes para tarefas comuns de ML, como regressão, classificação, clustering e redução de dimensionalidade.
Pré-processamento de dados
Antes de treinar um modelo, os dados precisam de alguns ajustes, como:
Limpeza: remover valores nulos ou corrigir dados.
Feature engineering: criar novas variáveis a partir das existentes.
Codificação de variáveis categóricas (ex: OneHotEncoding ou StringIndexer).
Escalonamento de variáveis numéricas (ex: usando StandardScaler ou MinMaxScaler).
Modelos suportados
O MLlib tem vários modelos, como:
Regressão: Para prever valores contínuos (ex: regressão linear).
Classificação: Para categorizar dados em classes (ex: árvore de decisão, SVM).
Clustering: Para agrupar dados semelhantes (ex: K-Means).
Redução de dimensionalidade: Para simplificar os dados sem perder informações (ex: PCA).
Pipelines e Tunagem de Hiperparâmetros
Pipelines: Ajudam a organizar as etapas do ML, como pré-processamento, treinamento e avaliação de modelos, tudo em um fluxo de trabalho reutilizável.
Tunagem de Hiperparâmetros: Refere-se a encontrar os melhores parâmetros para otimizar o modelo. Isso pode ser feito com técnicas como Grid Search e Random Search, usando o CrossValidator do Spark.
Vamos aos exemplos!!!!!
Exemplos úteis de código em Apache Spark para Machine Learning usando a biblioteca MLlib. Vou cobrir casos comuns como classificação, regressão, clustering e pipelines.
1. Exemplo de Regressão Linear (Regressão)
Esse exemplo usa regressão linear para prever um valor contínuo, como o preço de uma casa, com base em variáveis preditoras.
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName("LinearRegressionExample").getOrCreate()
data = spark.read.csv("data.csv", header=True, inferSchema=True)
#preparar dados (Criar features a partir das colunas)
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
#dividir dados em treino e teste
train_data, test_data = data.randomSplit([0.8, 0.2])
#inicializar e treinar o modelo
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
#avaliar o modelo
test_results = lr_model.evaluate(test_data)
print("RMSE: ", test_results.rootMeanSquaredError)
Realizamos uma regressão linear utilizando o PySpark. Quando executado, o que será impresso é o erro quadrático médio (RMSE) do modelo de regressão linear nos dados de teste.
O RMSE (Root Mean Squared Error) é uma métrica usada para avaliar a precisão do modelo. Quanto menor o valor de RMSE, melhor o modelo.
Portanto, o que será impresso no console é:
RMSE: <valor do erro quadrático médio calculado>
2. Exemplo de Classificação com RandomForest
Esse exemplo utiliza Random Forest para um problema de classificação (ex: prever se um e-mail é spam ou não).
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("RandomForestExample").getOrCreate()
data = spark.read.csv("classification_data.csv", header=True, inferSchema=True)
#preparar dados
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
#dividir dados
train_data, test_data = data.randomSplit([0.7, 0.3])
#inicializar e treinar o modelo
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)
#avaliar o modelo
predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", accuracy)
utilizamos um classificador Random Forest para realizar a classificação em um conjunto de dados que calcula a precisão (accuracy) do modelo nos dados de teste. O que será impresso no console é a precisão (accuracy) do modelo.
O valor de “accuracy” depende dos dados presentes no arquivo classification_data.csv e de como o modelo se ajusta aos dados de treinamento e teste. A precisão é uma métrica de avaliação que indica a proporção de previsões corretas feitas pelo modelo em relação ao total de previsões.
Portanto, o que será impresso no console é:
Accuracy: <valor da precisão calculado>
Esse valor representará a taxa de acertos do modelo de Random Forest nos dados de teste.
3. Exemplo de Clustering com K-Means
Esse exemplo usa o algoritmo K-Means para agrupar dados em clusters, como segmentar clientes com base em características similares.
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator
#iniciar SparkSession
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
#carregar dados
data = spark.read.csv("customer_data.csv", header=True, inferSchema=True)
#preparar dados
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
#treinar o modelo
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(data)
#previsões e avaliação
predictions = model.transform(data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette With Set of Clusters: ", silhouette)
Usamos o algoritmo K-Means que calcula o valor da silhouette score, que é uma métrica usada para avaliar a qualidade do agrupamento.
O valor de silhouette varia de -1 a 1:
Um valor próximo de 1 indica que os clusters estão bem definidos.
Um valor próximo de 0 indica que os clusters estão mal definidos.
Um valor negativo indica que os dados podem estar agrupados de maneira errada.
Portanto, o que será impresso no console será algo como:
Silhouette With Set of Clusters: <valor da métrica de silhouette calculado>
Esse valor depende dos dados presentes no arquivo customer_data.csv e dos clusters gerados pelo modelo K-Means.
4. Exemplo de Ajuste de Hiperparâmetros com GridSearch
Aqui mostramos como usar GridSearch para otimizar hiperparâmetros no modelo de RandomForest.
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName("GridSearchExample").getOrCreate()
data = spark.read.csv("classification_data.csv", header=True, inferSchema=True)
#preparar dados
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
#dividir dados
train_data, test_data = data.randomSplit([0.7, 0.3])
#inicializar RandomForest
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
#construir grid de parâmetros
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]) \
.addGrid(rf.maxDepth, [5, 10]).build()
#avaliador
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
#crossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
#treinar o modelo
cvModel = cv.fit(train_data)
#avaliar o modelo
predictions = cvModel.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print("Best Model Accuracy: ", accuracy)
realizamos um Grid Search combinado com Cross-Validation para encontrar os melhores hiperparâmetros de um modelo de Random Forest e, em seguida, avaliamos sua precisão (accuracy) nos dados de teste.
O código imprimirá a precisão (accuracy) do melhor modelo encontrado pela validação cruzada. O valor exato dependerá dos dados contidos no arquivo classification_data.csv e de como o modelo Random Forest se ajusta aos dados.
A saída será algo como:
Best Model Accuracy: <valor da acurácia calculado>
Esse valor representa a taxa de acertos do modelo nos dados de teste após a otimização dos hiperparâmetros (numTrees e maxDepth).
Processamento de Dados com GraphX(🟡)
O livro nos introduz ao GraphX, e essa é uma ferramenta MUITO interessante!
Ele é a biblioteca do Spark para trabalhar com grafos, ou seja, dados conectados, tipo redes sociais, sistemas de recomendação e análise de relacionamento entre entidades. Ele permite processar grandes volumes de dados distribuídos de maneira eficiente.
No GraphX, um grafo é composto por:
Vértices (nodes): Representam os objetos, como usuários em uma rede social.
Arestas (edges): Conectam os vértices e representam relacionamentos (ex: fulano segue ciclano).
Atributos: Tanto vértices quanto arestas podem ter atributos, tipo peso de uma conexão.
O Spark permite criar grafos a partir de DataFrames, RDDs ou carregar de fontes como arquivos Parquet e bancos de dados.
Algoritmos comuns no GraphX
GraphX já vem com alguns algoritmos prontos para facilitar a análise de grafos. Alguns dos mais usados:
PageRank: Calcula a importância de cada nó dentro de um grafo (muito usado em mecanismos de busca, como o Google).
Connected Components: Identifica grupos de nós conectados entre si (útil para descobrir comunidades em redes sociais).
Shortest Paths: Encontra o menor caminho entre dois nós (exemplo: calcular a rota mais curta entre cidades num mapa).
GraphX é muito útil quando precisamos analisar conexões entre elementos. Alguns exemplos:
Redes sociais: Descobrir influenciadores com PageRank ou identificar comunidades de usuários.
Sistemas de recomendação: Encontrar relações entre usuários e produtos para sugerir novas compras.
Análise de fraudes: Detectar padrões suspeitos em transações financeiras.
No fim, o GraphX transforma relações complexas em insights valiosos, aproveitando a escalabilidade do Spark para lidar com grandes volumes de dados conectados.
Vou apresentar um caso de uso para calcular o PageRank de uma rede de usuários!
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from graphframes import GraphFrame
#SparkSession com suporte ao GraphFrames
spark = SparkSession.builder \
.appName("GraphXExample") \
.config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
.getOrCreate()
#criando os vértices (nós do grafo)
vertices = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "David")
], ["id", "name"])
#criando as arestas (conexões entre os nós)
edges = spark.createDataFrame([
(1, 2),
(2, 3),
(3, 1),
(3, 4),
(4, 2)
], ["src", "dst"])
#criando o grafo
graph = GraphFrame(vertices, edges)
#aplicando o algoritmo PageRank
page_rank = graph.pageRank(resetProbability=0.15, maxIter=10)
#mostrando os resultados
print("PageRank dos vértices:")
page_rank.vertices.select("id", "pagerank").orderBy(col("pagerank").desc()).show()
Esse código cria a SparkSession com suporte ao GraphFrames, define os vértices (pessoas da rede social), define as arestas (quem está conectado a quem), cria o grafo usando o GraphFrame, executa o algoritmo PageRank, que mede a importância de cada nó, exibe os resultados ordenados pelo maior valor de PageRank.
Otimização de Performance no Spark(🔴)
Na minha opinião, essa é uma das partes mais importantes e úteis do livro. Ela aborda particionamento e paralelismo, uso eficiente de cache e persistência, ajuste de configurações de execução (memory tuning, parallelism, shuffle) e estratégias para evitar operações custosas, como wide transformations e shuffles desnecessários.
Fiz um post bem completo no meu site explicando cada um desses conceitos (e mais alguns), então vale a pena conferir!
Esses foram os meus highlights do livro! Espero que tenham sido úteis e que você tenha compreendido bem. Mas não deixe de ler o livro por conta própria—eu recomendo muito! Além disso, fazer seus próprios highlights pode te ajudar a absorver ainda mais cada um dos assuntos.
Se tiver alguma dúvida ou quiser compartilhar um feedback, fique à vontade para entrar em contato comigo!