A platform for cluster computing.
Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Spark is a platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.
As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.
连接集群
The first step in using Spark is connecting to a cluster.
In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called worker. The master sends the workers data and calculations to run, and they send their results back to the master.
Creating the connection is as simple as creating an instance of the SparkContext
class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.
An object holding all these attributes can be created with the SparkConf()
constructor. Take a look at the documentation for all the details!
1 | # 创建连接 |
Spark 中的数据
Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly
The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.
When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!
To start working with Spark DataFrames, you first have to create a SparkSession
object from your SparkContext
. You can think of the SparkContext
as your connection to the cluster and the SparkSession
as your interface with that connection.
we will have a SparkSession
called spark
.
Creating multiple SparkSession
s and SparkContext
s can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate()
method. This returns an existing SparkSession
if there's already one in the environment, or creates a new one if necessary!
1 | # Import SparkSession from pyspark.sql |
Once you've created a SparkSession
, you can start poking around to see what data is in your cluster!
Your SparkSession
has an attribute called catalog
which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.
One of the most useful is the .listTables()
method, which returns the names of all the tables in your cluster as a list.
1 | # Print the tables in the catalog |
1 | # Running a query on this table |
读取 Spark 数据到 Pandas
1 | # Don't change this query |
Pandas Data Frame 写入到 Spark
1 | # Create pd_temp |
The .createDataFrame()
method takes a pandas
DataFrame and returns a Spark DataFrame.
The output of this method is stored locally, not in the SparkSession
catalog. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.
For example, a SQL query (using the .sql()
method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.
You can do this using the .createTempView()
Spark DataFrame method, which takes as its only argument the name of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific SparkSession
used to create the Spark DataFrame.
There is also the method .createOrReplaceTempView()
. This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You'll use this method to avoid running into problems with duplicate tables.
Check out the diagram to see all the different ways your Spark data structures interact with each other.
Spark 读取文本文件
1 | # Don't change this file path |
操作数据
In Spark you can do this using the .withColumn()
method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.
The new column must be an object of class Column
. Creating one of these is as easy as extracting a column from your DataFrame using df.colName
.
Updating a Spark DataFrame is somewhat different than working in pandas
because the Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.
Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:
1 | df = df.withColumn("newCol", df.oldCol + 1) |
The above code creates a DataFrame with the same columns as df
plus a new column, newCol
, where every entry is equal to the corresponding entry from oldCol
, plus one.
To overwrite an existing column, just pass the name of the column as the first argument!
Filter Data
The .filter()
method takes either an expression that would follow the WHERE
clause of a SQL expression as a string, or a Spark Column of boolean (True
/False
) values.
For example, the following two expressions will produce the same output:
1 | flights.filter("air_time > 120").show() |
Select Data
The Spark variant of SQL's SELECT
is the .select()
method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName
syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside .withColumn()
.
The difference between .select()
and .withColumn()
methods is that .select()
returns only the columns you specify, while .withColumn()
returns all the columns of the DataFrame in addition to the one you defined. It's often a good idea to drop columns you don't need at the beginning of an operation so that you're not dragging around extra data as you're wrangling. In this case, you would use .select()
and not .withColumn()
.
Similar to SQL, you can also use the .select()
method to perform column-wise operations. When you're selecting a column using the df.colName
notation, you can perform any column operation and the .select()
method will return the transformed column. For example,
1 | flights.select(flights.air_time/60) |
returns a column of flight durations in hours instead of minutes. You can also use the .alias()
method to rename a column you're selecting. So if you wanted to .select()
the column duration_hrs
(which isn't in your DataFrame) you could do
1 | flights.select((flights.air_time/60).alias("duration_hrs")) |
with the SQL as
keyword being equivalent to the .alias()
method. To select multiple columns, you can pass multiple strings.
Aggregating
All of the common aggregation methods, like .min()
, .max()
, and .count()
are GroupedData
methods. These are created by calling the .groupBy()
DataFrame method. You'll learn exactly what that means in a few exercises. For now, all you have to do to use these functions is call that method on your DataFrame. For example, to find the minimum value of a column, col
, in a DataFrame, df
, you could do
1 | df.groupBy().min("col").show() |
This creates a GroupedData
object (so you can use the .min()
method), then finds the minimum value in col
, and returns it as a DataFrame.
Grouping and Aggregating I
Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: pyspark.sql.GroupedData
, which you saw in the last two exercises.
You've learned how to create a grouped DataFrame by calling the .groupBy()
method on a DataFrame with no arguments.
Now you'll see that when you pass the name of one or more columns in your DataFrame to the .groupBy()
method, the aggregation methods behave like when you use a GROUP BY
statement in a SQL query!
Grouping and Aggregating II
In addition to the GroupedData
methods you've already seen, there is also the .agg()
method. This method lets you pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions
submodule.
This submodule contains many useful functions for computing things like standard deviations. All the aggregation functions in this submodule take the name of a column in a GroupedData
table.
Joining
Another very common data operation is the join. Joins are a whole topic unto themselves, so in this course we'll just look at simple joins. If you'd like to learn more about joins, you can take a look here.
A join will combine two different tables along a column that they share. This column is called the key. Examples of keys here include the tailnum
and carrier
columns from the flights
table.
For example, suppose that you want to know more information about the plane that flew a flight than just the tail number. This information isn't in the flights
table because the same plane flies many different flights over the course of two years, so including this information in every row would result in a lot of duplication. To avoid this, you'd have a second table that has only one row for each plane and whose columns list all the information about the plane, including its tail number. You could call this table planes
When you join the flights
table to this table of airplane information, you're adding all the columns from the planes
table to the flights
table. To fill these columns with information, you'll look at the tail number from the flights
table and find the matching one in the planes
table, and then use that row to fill out all the new columns.
Joining II
In PySpark, joins are performed using the DataFrame method .join()
. This method takes three arguments. The first is the second DataFrame that you want to join with the first one. The second argument, on
, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, how
, specifies the kind of join to perform. In this course we'll always use the value how="leftouter"
.
Machine learning pipelines
In the next two chapters you'll step through every stage of the machine learning pipeline, from data intake to model evaluation. Let's get to it!
At the core of the pyspark.ml
module are the Transformer
and Estimator
classes. Almost every other class in the module behaves similarly to these two basic classes.
Transformer
classes have a .transform()
method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer
to create discrete bins from a continuous feature or the class PCA
to reduce the dimensionality of your dataset using principal component analysis.
Estimator
classes all implement a .fit()
method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a StringIndexerModel
for including categorical data saved as strings in your models, or a RandomForestModel
that uses the random forest algorithm for classification or regression.
Join the DataFrames
In the next two chapters you'll be working to build a model that predicts whether or not a flight will be delayed based on the flights data we've been working with. This model will also include information about the plane that flew the route, so the first step is to join the two tables: flights
and planes
!
1 | # Rename year column |
Data types
Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).
When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.
To remedy this, you can use the .cast()
method in combination with the .withColumn()
method. It's important to note that .cast()
works on columns, while .withColumn()
works on DataFrames.
The only argument you need to pass to .cast()
is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer"
and for decimal numbers you'll use "double"
.
You can put this call to .cast()
inside a call to .withColumn()
to overwrite the already existing column, just like you did in the previous chapter!
String to integer
Now you'll use the .cast()
method you learned in the previous exercise to convert all the appropriate columns from your DataFrame model_data
to integers!
To convert the type of a column using the .cast()
method, you can write code like this:
1 | # Cast the columns to integers |
Making a Boolean
Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not!
1 | # Create is_late |
Strings and factors
As you know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But you'll also be using the airline and the plane's destination as features in your model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.
Fortunately, PySpark has functions for handling this built into the pyspark.ml.features
submodule. You can create what are called 'one-hot vectors' to represent the carrier and the destination of each flight. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).
Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).
The first step to encoding your categorical feature is to create a StringIndexer
. Members of this class are Estimator
s that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator
returns a Transformer
that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.
The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder
. This works exactly the same way as the StringIndexer
by creating an Estimator
and then a Transformer
. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!
This may seem complicated, but don't worry! All you have to remember is that you need to create a StringIndexer
and a OneHotEncoder
, and the Pipeline
will take care of the rest.
Carrier & Destination
1 | # Create a StringIndexer |
Assemble a vector
The last step in the Pipeline
is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.
Because of this, the pyspark.ml.feature
submodule contains a class called VectorAssembler
. This Transformer
takes all of the columns you specify and combines them into a new vector column.
1 | # Make a VectorAssembler |
Create the pipeline
ou're finally ready to create a Pipeline
!
Pipeline
is a class in the pyspark.ml
module that combines all the Estimators
and Transformers
that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object. Neat, right?
1 | # Import Pipeline |
Test and Train
After you've cleaned your data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, don't touch your test data until you think you have a good model! As you're building models and forming hypotheses, you can test them on your training data to get an idea of their performance.
Once you've got your favorite model, you can see how well it predicts the new data in your test set. This never-before-seen data will give you a much more realistic idea of your model's performance in the real world when you're trying to predict or classify new data.
In Spark it's important to make sure you split the data after all the transformations. This is because operations like StringIndexer
don't always produce the same index even when given the same list of strings.
Transform the data
1 | # Fit and transform the data |
Split the data
Now that you've done all your manipulations, the last step before modeling is to split the data!
1 | # Split the data into training and test sets |
Model tuning and selection
What is logistic regression?
The model you'll be fitting in this chapter is called a logistic regression. This model is very similar to a linear regression, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.
To use this as a classification algorithm, all you have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, you classify that observation as a 'yes' (in this case, the flight being late), if it's below, you classify it as a 'no'!
You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.
建立模型
The Estimator
you'll be using is a LogisticRegression
from the pyspark.ml.classification
submodule.
1 | # Import LogisticRegression |
Cross validation
In the next few exercises you'll be tuning your logistic regression model using a procedure called k-fold cross validation. This is a method of estimating the model's performance on unseen data (like your test
DataFrame).
It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.
You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, elasticNetParam
and regParam
, and using the cross validation error to compare all the different models so you can choose the best one!
Create the evaluator
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation
submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the BinaryClassificationEvaluator
from the pyspark.ml.evaluation
module.
This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!
1 | # Import the evaluation submodule |
Make a grid
Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning
includes a class called ParamGridBuilder
that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).
You'll need to use the .addGrid()
and .build()
methods to create a grid that you can use for cross validation. The .addGrid()
method takes a model parameter (an attribute of the model Estimator
, lr
, that you created a few exercises ago) and a list of values that you want to try. The .build()
method takes no arguments, it just returns the grid that you'll use later.
1 | # Import the tuning submodule |
Make the validator
The submodule pyspark.ml.tuning
also has a class called CrossValidator
for performing cross validation. This Estimator
takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.
The submodule pyspark.ml.tune
has already been imported as tune
. You'll create the CrossValidator
by passing it the logistic regression Estimator
lr
, the parameter grid
, and the evaluator
you created in the previous exercises.
1 | # Create the CrossValidator |
Fit the model(s)
You're finally ready to fit the models and select the best one!
Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.
To do this locally you would use the code:
1 | # Fit cross validation models |
Remember, the training data is called training
and you're using lr
to fit a logistic regression model. Cross validation selected the parameter values regParam=0
and elasticNetParam=0
as being the best. These are the default values, so you don't need to do anything else with lr
before fitting the model.
1 | # Call lr.fit() |
Evaluate the model
For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!
1 | # Use the model to predict the test set |