🇬🇧 Highlights Spark the definitive guide
🇧🇷 para ler este artigo em portuguĂŞs clique aquiÂ
Before we start, I’d like to make it clear that these highlights are based on my notes about what I read and understood from the book. I don’t use any examples from the book itself and write based on my own words and thoughts extracted from it (don’t sue me).
That said, let’s talk about what I found most interesting and important in this book. Some of the things I’ll mention here may seem basic for those who already have some knowledge of Spark — and, in fact, they are. That’s why I’ve classified the topics as basic (🟢), intermediate (🟡), and advanced (🔴) so you can better navigate them.
Let’s get started.
Data Structures in Spark (🟢)
The book introduces some of Spark’s data structures, including RDDs, DataFrames, and Datasets. Here’s a not-so-brief summary of them.
RDDs (Resilient Distributed Datasets):
RDDs (Resilient Distributed Datasets) are the foundation of Apache Spark for handling data in a distributed manner. Basically, they split data into multiple parts and distribute them across the cluster nodes, allowing parallel and efficient processing.
What makes RDDs special?
- Immutable → Once created, an RDD cannot be changed. If you need to modify it, Spark generates a new one.
- Distributed → Data is automatically spread across the cluster.
- Fault-tolerant → If a node fails, Spark can recreate the data using the operation history.
- Lazy Evaluation → Spark doesn’t execute operations immediately—only when necessary (like when calling
collect()orcount()).
How to work with RDDs?
- Transformations → Create new RDDs (e.g.,
map(),filter(),flatMap(),reduceByKey()). - Actions → Execute operations and return a result (e.g.,
collect(),count(),take()).
RDDs are powerful, but nowadays, DataFrames and Datasets are more commonly used because they take better advantage of Spark’s optimizations. Still, understanding RDDs gives you more control over processing (and this might come up in a job interview, so pay attention).
DataFrames
DataFrames are the most efficient way to work with data in Spark. They function like SQL tables, organizing data into rows and columns while leveraging Spark’s performance and parallelism.
What makes DataFrames special?
- Automatic optimization → Spark uses the Catalyst Optimizer (Spark SQL’s query optimizer) to make queries more efficient before execution.
- Less code, better performance → Operations are simpler and more efficient than in RDDs.
- SQL integration → We can run SQL queries directly on DataFrames (it doesn’t get easier than this).
- Support for multiple formats → Easy reading and writing in JSON, Parquet, CSV, Avro, etc.
How to Work with DataFrames?
- Transformations → Similar to SQL (
select(),filter(),groupBy(),join()). - Actions → Execute operations and return results (
show(),count()).
DataFrames are faster and more optimized than RDDs, making them the ideal choice for most cases. If more control is needed, it’s still possible to convert a DataFrame to an RDD when necessary (but you will rarely need to do this).
Datasets
Datasets combine the best of RDDs and DataFrames. They are strongly typed, which means better code safety, while still benefiting from Spark’s automatic optimizations. They are ideal when more control over data is required.
What Makes Datasets Special?
- Strongly Typed → Spark knows exactly the data type, preventing runtime errors.
- Smart Optimization → Uses the Catalyst Optimizer and Tungsten for faster execution.
- Flexible APIs → Supports both functional operations (like RDDs) and SQL-like operations (like DataFrames).
- Interoperability → A fancy way of saying that they can be converted between DataFrames, Datasets, and RDDs.
How to Work with Datasets?
- Transformations →
map(),filter(),groupBy(),join(). - Actions →
show(),count().
Datasets are a great option when code safety and good optimizations are needed.
Among all these structures, DataFrames are the most commonly used.
Data Manipulation with DataFrames and SQL
Using Spark SQL for Relational Queries (🟢, but worth checking out)
The book explains that Spark allows relational queries using familiar SQL syntax, but that’s something we already know. What I want to highlight here is when to use Spark SQL or Spark APIs, and even discuss the controversial topic of which one is faster.
When to use each one?
- If you’re performing many query operations and are already familiar with SQL, Spark SQL is a great choice.
- If you need more control and flexibility or are dealing with more complex operations, use Spark APIs.
- In the end, you can mix both! Use whatever is most convenient for your needs.
Performance Comparison
When it comes to speed, Spark SQL is generally faster than Spark APIs, such as DataFrames and RDDs, especially for relational queries. This happens because of the Catalyst Optimizer, which does a great job of optimizing queries.
Here’s why:
- Query Optimization
- Catalyst analyzes your SQL queries and adjusts them to improve execution, reducing response time.
- Pipeline Execution
- Spark SQL executes multiple operations in a pipeline, meaning it reads and writes data more efficiently, which is useful for complex queries.
- Efficient Resource Usage
- Spark SQL is designed to use memory and cluster resources intelligently, ensuring everything runs smoothly and quickly.
- Broadcast Joins
- When dealing with smaller tables, Spark SQL can use broadcast joins, sending small tables to all nodes, speeding up the join operation.
Summary:
- Spark SQL is usually faster for relational queries because of its optimizations and better resource utilization.
- Spark APIs might perform better in cases with complex logic or non-relational data.
- Ultimately, performance depends on the data and the operations being performed.
Internal Optimizations: Catalyst Optimizer, Tungsten Execution Engine, and Memory Management (đź”´)
The book briefly mentions Catalyst Optimizer, but I wanted to expand on this topic and dive deeper into internal optimizations, providing a more complete view of the subject.
Catalyst Optimizer
As previously explained, Catalyst Optimizer is responsible for optimizing queries in Spark SQL, DataFrames, and Datasets. It applies several transformations to make execution more efficient.
Catalyst operates in different phases:
Phases of Catalyst Optimizer
- Analysis
- The SQL code or DataFrame is converted into a Logical Plan based on the data structure (Schema).
- It checks for errors, such as missing columns or incompatible data types.
- Optimization
- Applies Rewrite Rules to improve the query. Examples:
- Predicate Pushdown: Filters (
WHEREconditions) are pushed earlier in execution, reducing the amount of processed data. - Constant Folding: Constant expressions are precomputed before execution to avoid unnecessary calculations.
- Predicate Pushdown: Filters (
- Applies Rewrite Rules to improve the query. Examples:
- Physical Plan Generation
- The logical plan is converted into a Physical Plan, where execution strategies are selected.
- Spark decides between Broadcast Join, Sort Merge Join, Shuffle Hash Join, depending on data size.
- The final code is generated using optimized Java bytecode to improve execution speed.
Didn’t fully understand?
Let’s see an example of Predicate Pushdown and Constant Folding:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CatalystOptimization").getOrCreate()
df = spark.read.parquet("employees.parquet")
df_filtered = df.filter("salary > 50000 AND department = 'Engineering' AND 1 = 1")\
.select("name", "salary", "department")
df_filtered.explain(True)
Predicate Pushdown
- The filter
salary > 50000 AND department = 'Engineering'will be applied before reading the data, reducing the amount of information loaded into memory.
This works especially well with columnar formats like Parquet and ORC.
Constant Folding
The Catalyst detects that 1 = 1 does not impact the query and simply removes this part.
Column Pruning
Since we are selecting only name, salary, and department, Spark does not load the other columns, reducing memory usage and processing.
Do you understand it better now?
Tungsten Execution Engine
The Tungsten Execution Engine improves performance by reducing the overhead of the Java interpreter and optimizing CPU and memory usage.
Key Improvements in Tungsten
Whole-Stage Code Generation (WSCG):
Spark generates optimized native code for execution, reducing interpretation overhead.
Pipeline Execution:
Operations are chained together to reduce the need for writing/intermediating data on disk.
Vectorized Query Execution:
Instead of processing row by row, Spark processes multiple rows at the same time in batches, reducing function calls.
Example of Vectorized Execution
df = spark.read.parquet(“large_dataset.parquet”)
df.select(“id”, “salary”).summary().show()
If the data is in Parquet, Spark will use vectorized execution to reduce computational cost.
Enabling Vectorized Execution Manually
If it is not enabled by default:
spark.conf.set(“spark.sql.parquet.enableVectorizedReader”, “true”)
Memory Management in Spark
Spark uses a hybrid memory and disk model, ensuring that processing continues even when RAM is insufficient.
Memory Components in Spark
Memory in Spark is divided into Executor Memory:
- Storage Memory: Stores cached RDD and DataFrame data.
- Execution Memory: Used for operations like Sort, Aggregations, and Joins.
If Execution Memory needs more space, it can take part of Storage Memory, and vice versa.
Configuring Memory in Spark
We can adjust the memory available for executors:
spark.conf.set("spark.executor.memory", "4g") # sets 4GB for each executor
spark.conf.set("spark.memory.fraction", "0.6") # 60% of memory will be used for execution
Easy?
Spill to Disk (Fallback)
If memory runs out, Spark writes temporary data to disk:
This is bad for performance, so we should avoid it by increasing executor.memory.
Tip!: Always use .explain(True) to understand how Spark is optimizing your queries!
Reading and Writing Data (🟢)
Read
Now, let’s move from a complex topic to a much simpler one.
The book presents connectors for different sources: CSV, JSON, Parquet, Avro, ORC.
I will summarize everything in a single code example for you.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataConnectors").getOrCreate()
# Reading CSV
df_csv = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
df_csv.show()
# Reading JSON
df_json = spark.read.option("multiline", "true").json("data.json")
df_json.show()
# Reading Parquet
df_parquet = spark.read.parquet("data.parquet")
df_parquet.show()
# Reading Avro
df_avro = spark.read.format("avro").load("data.avro")
df_avro.show()
# Reading ORC
df_orc = spark.read.orc("data.orc")
df_orc.show()
Integration with Databases like MySQL, PostgreSQL, and NoSQL (Cassandra, MongoDB)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DatabaseConnectors").getOrCreate()
# MySQL Connection
df_mysql = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/db_name") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
df_mysql.show()
# PostgreSQL Connection
df_postgres = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/db_name") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
df_postgres.show()
# Cassandra Connection
df_cassandra = spark.read \
.format("org.apache.spark.sql.cassandra") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.load()
df_cassandra.show()
# MongoDB Connection
df_mongo = spark.read \
.format("mongo") \
.option("uri", "mongodb://localhost:27017/db_name.collection_name") \
.load()
df_mongo.show()
Using Delta Lake for Versioning and Managing Transactional Tables (🟡)
Let’s raise the level of our conversation again. The book introduces Delta Lake a little (too little, actually), so I’ll present some additional insights.
Please, do not confuse Delta Lake with Data Lake!
Delta Lake is like an upgrade for Spark that makes your data lake more secure, fast, and reliable. It uses the Parquet format but with version control and transaction support.
What makes Delta Lake so good?
ACID Transactions – Prevents data corruption and ensures changes are applied correctly. Versioning (Time Travel) – Allows accessing previous versions of the table. Schema Evolution – Accepts changes to the table structure without errors. Optimized Merge, Upsert, and Delete – Updating or deleting data becomes much easier.
Let’s see some examples:
PySpark Example
Creating a Delta Table
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
data = [(1, "Alice", 5000), (2, "Bob", 6000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])
df.write.format("delta").mode("overwrite").save("/tmp/delta_table")
Here, the SparkSession is configured with Delta Lake extensions, allowing Spark to understand and work with this data format. Notice that the “delta” format is used, which ensures the Delta table is saved in Parquet format.
With the Delta table, it’s possible to “go back in time” and navigate through its modifications to recover versions. I’ve already written a detailed article explaining how Delta Lake’s TimeTravel works, if you’d like to dive deeper:
How to go back in time and access historical versions of your tables
But here’s a quick example:
df_old_version = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta_table")
df_old_version.show()
Here we are reading the data saved earlier in Delta Lake, but with a detail:
.option("versionAsOf", 0) → This tells Spark to load version 0 of the table, i.e., the first version created.
Delta Lake keeps a history of changes to the table. Every time data is overwritten or updated, it creates a new version (e.g., version 0, version 1, version 2…).
Upsert with MERGE
from delta.tables import DeltaTable
updates = spark.createDataFrame([
(1, "Alice", 7000), # updating salary
(3, "Charlie", 8000) # new record
], ["id", "name", "salary"])
delta_table = DeltaTable.forPath(spark, "/tmp/delta_table")
delta_table.alias("old") \
.merge(updates.alias("new"), "old.id = new.id") \
.whenMatchedUpdate(set={"salary": "new.salary"}) \
.whenNotMatchedInsert(values={"id": "new.id", "name": "new.name", "salary": "new.salary"}) \
.execute()
This code performs a MERGE (upsert) on a Delta Lake table, i.e., it updates existing records and inserts new records efficiently and transactionally.
Delta Lake checks if new data already exists in the table and decides what to do:
If the id exists (whenMatchedUpdate) → Updates the salary. If the id doesn’t exist (whenNotMatchedInsert) → Inserts the new record.
Why is this more efficient?
Well, first of all, you stop doing workarounds when you need to modify the table (đź‘€)
And also because this prevents data duplication, maintains a history of changes (Delta Lake versions everything), is more efficient than deleting and rewriting the entire table, etc.
Data Processing in Streaming(đź”´)
In the book, the part about streaming data processing is very comprehensive and includes several interesting code examples. This is one of the parts I liked the most.
I won’t share the exact excerpt here because I don’t want copyright issues, but I’ll introduce a bit of what’s presented in the book, with my view and knowledge.
Structured Streaming is the Spark API for continuous data processing. It allows processing data streams almost in real-time, using the same DataFrame and SQL structure we already know.
Structured Streaming works in a declarative way, i.e., you write the code as if you’re dealing with a static DataFrame, but Spark treats the data as a continuous stream.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
df_stream = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
df_stream.writeStream.format("console").start().awaitTermination()
In this example, Spark is reading real-time messages from a socket (port 9999) and printing them to the screen.
Structured Streaming can operate in two different ways:
Micro-Batch (Default)
The data stream is processed in small batches. Each batch has a fixed time interval (e.g., process new data every 1 second). Good performance and compatibility with most data sources.
Continuous Processing (Low Latency)
The data is processed record by record, without waiting to form a batch. Ideal for low latency, but less compatible with some operations.
df_stream.writeStream.format("console").trigger(continuous="1 second").start()
With this code, processing happens continuously without waiting for batches to form.
Spark can consume data from various streaming sources:
🔌 Source Use Case Example Kafka Application logs, user events Kinesis Integration with AWS, IoT Socket Local testing, network messages Real-time Files Monitored directory logs
Example: Reading a data stream from Kafka
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_topic") \
.load()
Here, Spark is consuming messages from the my_topic topic in Kafka. Whenever a new message arrives, Spark captures it, processes it, and can apply transformations in real-time. This enables continuous analytics like filtering, aggregation, and even writing the results to another system, like a database or another Kafka topic.
Structured Streaming ensures fault tolerance using Checkpoints and Write-Ahead Logs (a logging mechanism that guarantees durability and fault tolerance in data processing).
Checkpointing stores the state of the application, allowing it to resume from where it left off in case of failure.
df_stream.writeStream \
.format("parquet") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()
If Spark restarts, it will continue from the last saved state in /tmp/checkpoints.
Summary of the Summary
- Structured Streaming → Declarative API for processing data streams as DataFrames.
- Micro-Batch → Processes in small batches (default).
- Continuous Processing → Processes data one-by-one (lower latency).
- Data Sources → Kafka, Kinesis, sockets, and real-time files.
- Checkpointing → Prevents data loss and ensures fault tolerance.
Machine Learning and Artificial Intelligence (đź”´)
Unlike the previous topic, where the book is quite comprehensive and presents various examples, in this case, the book is somewhat shallow and focuses on explaining MLlib and K-means superficially.
I understand that if the author went deeper into the topic, it would require a whole book just for that, as it is a complex subject that demands a lot of dedication. So, instead of detailing the content excessively, I will take a different approach: I’ll talk less about what it is and show more how to do it, presenting several code examples.
This part is quite interesting, especially given how relevant the topic is today. What I will present here is just an introduction, and I recommend that you dive deeper into the subject elsewhere, especially if you wish to become a sought-after professional in the future.
Machine Learning (ML) in Apache Spark
Machine Learning (ML) in Apache Spark is quite easy to use with the MLlib library. It has several tools for working with machine learning models on large volumes of data.
Introduction to MLlib
MLlib is Spark’s machine learning library, designed to handle large, distributed data. It provides efficient algorithms for common ML tasks like regression, classification, clustering, and dimensionality reduction.
Data Preprocessing
Before training a model, the data needs some adjustments, such as:
- Cleaning: removing null values or correcting data.
- Feature engineering: creating new variables from the existing ones.
- Categorical variable encoding (e.g., OneHotEncoding or StringIndexer).
- Scaling numerical variables (e.g., using StandardScaler or MinMaxScaler).
Supported Models
MLlib has several models, such as:
- Regression: To predict continuous values (e.g., linear regression).
- Classification: To categorize data into classes (e.g., decision tree, SVM).
- Clustering: To group similar data (e.g., K-Means).
- Dimensionality reduction: To simplify data without losing information (e.g., PCA).
Pipelines and Hyperparameter Tuning
- Pipelines: Help organize ML steps, like preprocessing, training, and evaluating models, all in a reusable workflow.
- Hyperparameter Tuning: Refers to finding the best parameters to optimize the model. This can be done with techniques like Grid Search and Random Search, using Spark’s CrossValidator.
Let’s look at some examples!!!!
Useful Apache Spark Code Examples for Machine Learning using MLlib
I’ll cover common cases like classification, regression, clustering, and pipelines.
1. Linear Regression Example (Regression)
This example uses linear regression to predict a continuous value, like the price of a house, based on predictor variables.
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName("LinearRegressionExample").getOrCreate()
data = spark.read.csv("data.csv", header=True, inferSchema=True)
# Prepare data (Create features from columns)
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
# Split data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2])
# Initialize and train the model
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
# Evaluate the model
test_results = lr_model.evaluate(test_data)
print("RMSE: ", test_results.rootMeanSquaredError)
In this case, we perform linear regression using PySpark. When run, it will print the root mean squared error (RMSE) of the linear regression model on the test data.
RMSE (Root Mean Squared Error) is a metric used to evaluate model accuracy. The lower the RMSE value, the better the model.
Therefore, what will be printed on the console is:
RMSE: <calculated RMSE value>
2. Random Forest Classification Example
This example uses Random Forest for a classification problem (e.g., predicting whether an email is spam or not).
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("RandomForestExample").getOrCreate()
data = spark.read.csv("classification_data.csv", header=True, inferSchema=True)
# Prepare data
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
# Split data
train_data, test_data = data.randomSplit([0.7, 0.3])
# Initialize and train the model
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)
# Evaluate the model
predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", accuracy)
We use a Random Forest classifier to perform classification on a dataset and calculate the accuracy of the model on test data. What will be printed to the console is the accuracy of the model.
The “accuracy” value depends on the data present in the classification_data.csv file and how well the model fits the training and testing data. Accuracy is an evaluation metric that indicates the proportion of correct predictions made by the model compared to the total predictions.
Therefore, what will be printed in the console is:
Accuracy: <calculated accuracy value>
This value will represent the accuracy rate of the Random Forest model on the test data.
3. K-Means Clustering Example
This example uses the K-Means algorithm to cluster data, such as segmenting customers based on similar characteristics.
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator
# Start SparkSession
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
# Load data
data = spark.read.csv("customer_data.csv", header=True, inferSchema=True)
# Prepare data
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
# Train the model
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(data)
# Predictions and evaluation
predictions = model.transform(data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette With Set of Clusters: ", silhouette)
We use the K-Means algorithm to calculate the silhouette score, which is a metric used to evaluate the quality of clustering.
The silhouette value ranges from -1 to 1:
- A value close to 1 indicates well-defined clusters.
- A value close to 0 indicates poorly defined clusters.
- A negative value indicates the data might be grouped incorrectly.
Therefore, what will be printed to the console will be something like:
Silhouette With Set of Clusters: <calculated silhouette metric value>
This value depends on the data in the customer_data.csv file and the clusters generated by the K-Means model.
4. Hyperparameter Tuning with GridSearch Example
Here’s how to use GridSearch to optimize hyperparameters in the Random Forest model.
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName("GridSearchExample").getOrCreate()
data = spark.read.csv("classification_data.csv", header=True, inferSchema=True)
# Prepare data
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data = assembler.transform(data)
# Split data
train_data, test_data = data.randomSplit([0.7, 0.3])
# Initialize RandomForest
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
# Build parameter grid
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]) \
.addGrid(rf.maxDepth, [5, 10]).build()
# Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
# Train the model
cvModel = cv.fit(train_data)
# Evaluate the model
predictions = cvModel.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print("Best Model Accuracy: ", accuracy)
We perform Grid Search combined with Cross-Validation to find the best hyperparameters for a Random Forest model and then evaluate its accuracy on test data.
The code will print the accuracy of the best model found by cross-validation. The exact value will depend on the data in the classification_data.csv file and how the Random Forest model fits the data.
The output will be something like:
Best Model Accuracy: <calculated accuracy value>
This value represents the accuracy rate of the model on the test data after optimizing hyperparameters (numTrees and maxDepth).
Data Processing with GraphX (🟡)
The book introduces us to GraphX, and it’s a REALLY interesting tool!
It is the Spark library for working with graphs, i.e., connected data, like social networks, recommendation systems, and analysis of relationships between entities. It allows efficient processing of large volumes of distributed data.
In GraphX, a graph is composed of:
- Vertices (nodes): Represent objects, such as users in a social network.
- Edges: Connect vertices and represent relationships (e.g., one user follows another).
- Attributes: Both vertices and edges can have attributes, like the weight of a connection.
Spark allows creating graphs from DataFrames, RDDs, or loading from sources like Parquet files and databases.
Common GraphX Algorithms
GraphX comes with some pre-built algorithms to simplify graph analysis. Some of the most used ones are:
- PageRank: Calculates the importance of each node in a graph (widely used in search engines like Google).
- Connected Components: Identifies groups of connected nodes (useful for discovering communities in social networks).
- Shortest Paths: Finds the shortest path between two nodes (e.g., calculating the shortest route between cities on a map).
GraphX is very useful when we need to analyze connections between elements. Some examples include:
- Social networks: Discover influencers with PageRank or identify user communities.
- Recommendation systems: Find relationships between users and products to suggest new purchases.
- Fraud analysis: Detect suspicious patterns in financial transactions.
In the end, GraphX turns complex relationships into valuable insights, leveraging Spark’s scalability to handle large volumes of connected data.
Here’s an example of calculating PageRank of a user network!
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from graphframes import GraphFrame
# SparkSession with support for GraphFrames
spark = SparkSession.builder \
.appName("GraphXExample") \
.config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
.getOrCreate()
# Creating vertices (nodes of the graph)
vertices = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "David")
], ["id", "name"])
# Creating edges (connections between nodes)
edges = spark.createDataFrame([
(1, 2),
(2, 3),
(3, 1),
(3, 4),
(4, 2)
], ["src", "dst"])
# Creating the graph
graph = GraphFrame(vertices, edges)
# Applying the PageRank algorithm
page_rank = graph.pageRank(resetProbability=0.15, maxIter=10)
# Showing the results
print("PageRank of the vertices:")
page_rank.vertices.select("id", "pagerank").orderBy(col("pagerank").desc()).show()
This code creates the SparkSession with support for GraphFrames, defines vertices (people in the social network), defines edges (who is connected to whom), creates the graph using GraphFrame, applies the PageRank algorithm to measure the importance of each node, and displays the results ordered by the highest PageRank values.
Performance Optimization in Spark (đź”´)
In my opinion, this is one of the most important and useful parts of the book. It covers partitioning and parallelism, efficient use of cache and persistence, execution configuration tuning (memory tuning, parallelism, shuffle), and strategies to avoid costly operations, such as wide transformations and unnecessary shuffles.
I made a comprehensive post on my website explaining each of these concepts (and a few more), so it’s worth checking out!
These were my highlights from the book! I hope they were helpful and that you have a good understanding of the material. But don’t forget to read the book on your own—I highly recommend it! Also, making your own highlights can help you absorb each topic even more.
If you have any questions or want to share feedback, feel free to contact me!