Getting Started with Spark Streaming
This is a follow-along tutorial for beginners who wish to do a hands-on over spark streaming.
Pre-requisites
- Prior knowledge of spark
- Spark installed locally
For both Pre-requisites go through my previous blog
https://medium.com/data-engineering-on-cloud/pyspark-with-examples-96919b3d1192
Spark Streaming
Spark streaming is used where there is a stream of data i.e. data that is continuously generated by different sources. For e.g. Websites such as Twitter and Facebook are continuously fed with data from their billions of users, and these websites need to apply analytics on the fly while the data is coming into the systems.
Spark provides multiple streaming options called sources
File Source: When you get streaming data as incremental files.
Kafka Source: Kafka is a messaging system that contains a message broker supporting topics. These topics act as message producers, which in turn can be consumed by consumers.
Socket Source and Rate Source: Socket sources can read data from the socket connections and Rate sources can be used to generate rows at any frequency.
Let's Initialize spark Session
import os
import sys
import glob
from os.path import abspath
os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
sys.path[:0]=[spark_python,py4j]
os.environ['PYTHONPATH']=py4j
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Examples").enableHiveSupport().getOrCreate()
In this tutorial, we will be using File source.
Streaming DatFrame on a Directory
Let's go ahead and create an empty directory with the name temperature-data. I created it inside my documents folder C:\\Users\\ADMIN\\Documents\\temperature-data
The dataset that we will create has two columns (day and tempinCelsius).
First and foremost, we need to create a schema.
from pyspark.sql.types import StructType
temperatureSchema = StructType().add("day", "string").add("tempInCelcius", "string")
The next step is to create a data frame from streaming
temperature_streaming_df = spark.readStream.option("sep", ",") \
.schema(temperatureSchema).csv("C:\\Users\\ADMIN\\Documents\\temperature-data")
The readStream method returns a DataStreamReader used to read streaming data as a dataframe.
Streaming Query
Unless you start a sink on a streaming Dataframe, the streaming is not really active.
Sinks are where you write your output data. Spark supports the following types of sink:
File sink: The output is stored in a specified directory.
Kafka sink: Output is stored in kafka topics.
console sink: Used for debugging purposes to print the data into the console.
Memory: Used to save data into memory
The output mode defines how to write the data into the sink. Spark supports the following types of output modes:
Append: Appends the incoming stream of data into the sink.
Complete: This mode writes the data completely into the sink from in-memory.
Update: Only the updated rows are updated in the sink.
I will be using three files for the demo.
Step 1: Run the below command
query = temperature_streaming_df.writeStream.format("console").outputMode("append").start()
Step2: Create a CSV file with the name tempData.csv and place it inside the temperature-data folder.
Step3: Observe the output on console.
Step4: create another CSV file tempData2.csv and place it inside the temperature-data folder.
step5: Observe the output on the console.
Step6: create another CSV file tempData3.csv and place it inside the temperature-data folder.
step7: Observe the output on the console.
Spark provides an exactly-once guarantee, which means that each record will be processed exactly once.
Apply transformation on streaming
Let us change the format from console to memory, doing so, we are asking Spark to save the data in memory.
query = temperature_streaming_df.writeStream.format("memory").queryName("TemperatureData").outputMode("append").start()spark.sql("select * from TemperatureData").show()
spark.sql("select * from TemperatureData where day != 'day' and cast(tempInCelcius as double) >13").show()
This brings us to the end of our tutorial. I hope after following the blog you have a better understanding of how streaming data is analyzed using spark.
The GitHub link for the above code is https://github.com/shorya1996/PySpark/blob/main/spark_streaming.ipynb