Pyspark Interview Preparation Part 3: Coding Practice
In this blog we will cover two Pyspark Questions for Interview preperation, both the questions are in the form of case study that companies ask to evaluate the candidate.
Download the datasets used in the blog from:
https://github.com/shorya1996/PySpark/blob/main/Questions_dataset.zip
Question 1: Given a complete E-book of Pride and Prejudice, count the frequency of each word and return top 10 words.
# Read the input data
filename = "/FileStore/tables/pride_and_prejudice.txt"
book = spark.read.text(filename)
book.printSchema()
book.show()
# Tokenize
from pyspark.sql.functions import split, col
lines = book.select(split(col("value"), " ").alias("line"))
lines.show()
# Exploding a column of arrays into rows of elements
from pyspark.sql.functions import explode
words = lines.select(explode(col("line")).alias("word"))
words.show()
# Cleaning : lowering and removing punctuations
from pyspark.sql.functions import lower
words_lower = words.select(lower(col("word")).alias("word_lower"))
from pyspark.sql.functions import regexp_extract
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))
words_clean.show()
# Filtering and counting
words_nonnull = words_clean.filter(col("word") != "")
groups = words_nonnull.groupby(col("word")).count()
groups.show()
# Ordering result
groups.orderBy("count", ascending=False).show(10)
Question2: Write an end to end program, ordering channels by decreasing proportion of commercials.
Description : Program logging is a key element in the regulation and supervision of the Canadian broadcasting system. This important tool enables the Commission to verify the compliance of television licensees with regulations and conditions of licence, and to gather the information necessary for monitoring the impact of its policies on the broadcasting system, such as the Broadcasting and Telecom Regulatory Policy CRTC 2009–430 (http://www.crtc.gc.ca/eng/archive/2009/2009-430.htm).
BroadcastLogs
broadcasts logs submitted by the industry.
Licensees are required to keep, in a form acceptable to the Commission, a program log or a machine readable record of all of its programming. Licensees are also required to furnish to the Commission, within 30 days after the end of each month, the program log or machine readable record for that month.
[BroadcastLogID] [int] IDENTITY(1,1) NOT NULL,
[LogDate] [date] NOT NULL,
- Date of log submission
[AudienceTargetAgeID] [int] NULL,
-See table CD_AudienceTargetAge
[AudienceTargetEthnicID] [int] NULL,
-See table CD_AudienceTargetEthnic
[CategoryID] [int] NULL,
-See table CD_Category
[ClosedCaptionID] [int] NULL,
-See table CD_ClosedCaption
[CountryOfOriginID] [smallint] NULL,
-See table CD_CountryOfOrigin
[DubDramaCreditID] [int] NULL,
-See table CD_DubDramaCredit
[EthnicProgramID] [int] NULL,
-See table CD_EthnicProgram
[ProductionSourceID] [int] NULL,
-See table CD_ProductionSource
[ProgramClassID] [int] NULL,
-See table CD_ProgramClass
[Duration] [time](7) NULL,
[EndTime] [time](7) NULL,
[ProductionNO] [varchar](20) NULL,
[ProgramTitle] [varchar](100) NULL,
[StartTime] [time](7) NOT NULL,
[Subtitle] [varchar](100) NULL,
CD_Category
Code table defining broadcast category.
The category code defines the nature of the broadcast program. The program categories are three characters in length, ranging from 010 to 150. All applicable categories can be found in the Television Broadcasting Regulations, 1987, the Specialty Services regulations, 1990, and the Pay Television Regulations, 1990, for conventional services, specialty services and pay services, respectively.
[CategoryID] [int] IDENTITY(1,1) NOT NULL,
[CategoryCD] [char](3) NULL,
[ActiveFG] [bit] NOT NULL,
- Flag indicating if code is active 0 = false, 1 = true.
[EnglishDescription] [nvarchar](255) NULL,
[FrenchDescription] [varchar](255) NULL,
CD_Programming Class
This is a code table which describe the program.
Programming class describes the nature of each log entry in regards to whether it is a long-form program or a program segment, as well as which type of program it is(i.e. commercial, public service announcement, licensee identification,etc.). The field is three characters in length and is the first entry in each log line. Program classes also identify when a service is not airing programming, by logging OFF (Off Air) time.
[ProgramClassID] [INT] IDENTITY(1,1) NOT NULL,
- Unique Identifyer
[ProgramClassCD] [CHAR](4) NOT NULL,
[ActiveFG] [BIT] NOT NULL,
- Flag indicating if code is active 0 = false, 1 = true.
[EnglishDescription] [NVARCHAR](255) NULL,
[FrenchDescription] [NVARCHAR](255) NULL,
[EnglishShortDescription] [NVARCHAR](15) NULL,
[FrenchShortDescription] [NVARCHAR](15) NULL,
[DurationCalcDescription] [VARCHAR](30) NULL,
[DurationCalcShortDescription] [CHAR](2) NULL,
CODE:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Getting Th canadian TV Channels with the highest/lowest proportion of commercials").getOrCreate()
logs = spark.read.csv("/FileStore/tables/BroadcastLogs_2018_Q3_M8.csv", sep="|", header=True, inferSchema=True)
log_identifier = spark.read.csv("/FileStore/tables/LogIndentifier.csv", sep="|", header=True, inferSchema=True)
cd_category = spark.read.csv("/FileStore/tables/CD_Category.csv", sep="|", header=True, inferSchema=True
).select("CategoryID",
"CategoryCD",
F.col("EnglishDescription").alias("Category_Description"))
cd_program_class = spark.read.csv("/FileStore/tables/CD_ProgramClass.csv", sep="|", header=True, inferSchema=True
).select("ProgramClassID",
"ProgramClassCD",
F.col("EnglishDescription").alias("ProgramClass_Description"))
# Data processing
# Data processing
logs = logs.drop("BroadCastLogId", "SequenceNO")
logs = logs.withColumn(
"duration_seconds",
unix_timestamp(col('Duration')
))
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
logs_and_channels = logs.join(log_identifier, "LogServiceID")
full_logs = logs_and_channels.join(cd_category, "categoryID", how="left").join(
cd_program_class, "ProgramClassID", how="left"
)
answer = (
full_logs.groupBy("LogIdentifierID")
.agg(
F.sum(
F.when(
F.trim(F.col("ProgramClassCD")).isin(
["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
),
F.col("duration_seconds")
).otherwise(0)
).alias("duration_commercial"),
F.sum("duration_seconds").alias("duration_total")
)
.withColumn(
"commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
)
.fillna(0)
)
answer.orderBy("commercial_ratio", ascending=False).show(1000, False)