Spark In Depth

shorya sharma
9 min readDec 12, 2021

--

Data is increasing in Volume, Velocity and Variety. There is a blast of information. Regardless of where you look, information is all over. The need to deal with those information as fast as possible turns out to be a higher priority than any time in recent memory.

MapReduce has been valuable, however the amount of time it takes for the tasks to run is no longer satisfactory in most of the situations. The expectation to learn and adapt to composing a MapReduce job is additionally troublesome as it takes explicit programming information and the expertise. Also, MapReduce jobs only work for a specific set of use cases. You need something that works for a wider set of use cases.

SPARK

Apache Spark was designed as a computing platform to be fast, general-purpose, and easy to use.

  1. It extends the MapReduce model and takes it to a whole other level.
  2. The speed comes from the in-memory computations. Applications running in memory allows for a much faster processing and response. Spark is even faster than MapReduce for complex applications on disks.
  3. This generality covers a wide range of workloads under one system. You can run batch application such as MapReduce types jobs or iterative algorithms that builds upon each other. You can also run interactive queries and process streaming data with your application.

The usability with Spark empowers you to rapidly get it utilizing basic APIs for Scala, Python and Java. As referenced, there are extra libraries which you can use for SQL, machine learning, streaming, and graph processing.

Spark runs on Hadoop clusters such as Hadoop YARN or Apache Mesos, or even as a standalone with its own scheduler.

Like MapReduce, Spark provides parallel distributed processing, fault tolerance on commodity hardware, scalability, etc. Spark adds to the concept with aggressively cached in-memory distributed computing, low latency, high level APIs and stack of high level tools. This saves time and money.

SPARK UNIFIED STACK

The Spark core is a general-purpose system providing scheduling, distributing, and monitoring of the applications across a cluster.

Then you have the components on top of the core that are intended to interoperate intently, allowing the clients to consolidate them, very much like they would any libraries in a software project. The advantage of such a stack is that all the higher layer parts will acquire the enhancements made at the lower layers.

The Spark core is designed to scale up from one to thousands of nodes. It can run over a variety of cluster managers including Hadoop YARN and Apache Mesos. Or simply, it can even run as a standalone with its own built-in scheduler.

Spark SQL is designed to work with the Spark via SQL and HiveQL (a Hive variant of SQL). Spark SQL allows developers to intermix SQL with Spark’s programming language supported by Python, Scala, and Java.

Spark Streaming provides processing of live streams of data. The Spark Streaming API closely matches that of the Sparks Core’s API, making it easy for developers to move between applications that processes data stored in memory vs arriving in real-time. It also provides the same degree of fault tolerance, throughput, and scalability that the Spark Core provides.

Machine learning, MLlib is the machine learning library that provides multiple types of machine learning algorithms. All of these algorithms are designed to scale out across the cluster as well.

GraphX is a graph processing library with APIs to manipulate graphs and performing graph-parallel computations.

SPARK CLUSTER

There are three main components of a Spark cluster:

  1. You have the driver, where the SparkContext is located within the main program.
  2. To run on a cluster, you would need some sort of cluster manager. This could be either Spark’s standalone cluster manager, Mesos or Yarn.
  3. Then you have your worker nodes where the executor resides. The executors are the processes that run computations and store the data for the application.

The SparkContext sends the application, defined as JAR or Python files to each executor. Finally, it sends the tasks for each executor to run. Several things to understand about this architecture. Each application gets its own executor. The executor stays up for the entire duration of the application. The benefit of this is that the applications are isolated from each other, on the scheduling side and running on different JVMs. However, this means that you cannot share data across applications. You would need to externalize the data if you wish to share data between the different applications.

Spark applications don’t care about the underlying cluster manager. As long as it can acquire executors and communicate with each other, it can run on any cluster manager. Because the driver program schedules tasks on the cluster, it should run close to the worker nodes on the same local network. If you like to send remote requests to the cluster, it is better to use a RPC and have it submit operations from nearby. There are currently three supported cluster managers that we have mentioned before. Sparks comes with a standalone manager that you can use to get up and running. You can use Apache Mesos, a general cluster manager that can run and service Hadoop jobs. Finally, you can also use Hadoop YARN, the resource manager in Hadoop 2.

Resilient Distributed Dataset

Spark’s primary core abstraction is called Resilient Distributed Dataset or RDD. Essentially it is just a distributed collection of elements that is parallelized across the cluster.

You can have two types of RDD operations. Transformations and Actions.

Transformations are those that do not return a value. In fact, nothing is evaluated during the definition of these transformation statements. Spark just creates these Direct Acyclic Graphs or DAG, which will only be evaluated at runtime. We call this lazy evaluation. The fault tolerance aspect of RDDs allows Spark to reconstruct the transformations used to build the lineage to get back the lost data.

Actions are when the transformations get evaluated along with the action that is called for that RDD. Actions return values.

For example, you can do a count on a RDD, to get the number of elements within and that value is returned. The first step is loading the dataset from Hadoop. Then you apply successive transformations on it such as filter, map, or reduce. Nothing actually happens until an action is called. The DAG is simply refreshed each time until an action is called. This provides fault tolerance. For example, let’s say a node goes offline. All it needs to do when it comes back online is to re-evaluate the graph to where it left off.

Caching is provided with Spark to enable the processing to happen in memory. If it does not fit in memory, it will spill to disk.

RDD is a fault tolerant collection of elements that can be parallelized. In other words, they can be made to be operated on in parallel. They are also immutable. These are the key essential units of data in Spark.

When RDDs are created, a direct acyclic graph (DAG) is created. This kind of operation is called transformations. Transformations makes updates to that graph, but nothing actually happens until some action is called.

There are three methods for creating a RDD.

  1. You can parallelize an existing collection. This means that the data already resides within Spark and can now be operated on in parallel. As an example, if you have an array of data, you can create a RDD out of it by calling the parallelized method. This method returns a pointer to the RDD. So this new distributed dataset can now be operated upon in parallel throughout the cluster.
  2. The second method to create a RDD, is to reference a dataset. This dataset can come from any storage source supported by Hadoop such as HDFS, Cassandra, HBase, Amazon S3, etc.
  3. The third method to create a RDD is from transforming an existing RDD to create a new RDD.

RDD persistence

The cache function is actually the default of the persist function with the MEMORY_ONLY storage. One of the vital capacity of Spark is its speed through persisting or caching. Every node stores any segments of the cache and processes it in memory. When a subsequent action is called on the same dataset, or a derived dataset, it uses it from memory instead of having to retrieve it again. Future actions in such cases are frequently multiple times quicker.

Whenever a RDD first is persisted, it is kept in memory on the node. Caching is fault tolerant since, in such a case that it any of the segment is lost, it will consequently be recomputed utilizing the changes that initially made it.

There are two methods to invoke RDD persistence.

  1. persist()
  2. cache()

The persist() method allows you to specify a different storage level of caching. For example, you can choose to persist the data set on disk, persist it in memory but as serialized objects to save space, etc.

cache() method is just the default way of using persistence by storing deserialized objects in memory.

There is an experimental storage level storing the serialized object in Tachyon. This level reduces garbage collection overhead and allows the executors to be smaller and to share a pool of memory.

Spark provides two limited types of shared variables for common usage patterns:

  1. broadcast
  2. accumulators

Normally, when a function is passed from the driver to a worker, a separate copy of the variables are used for each worker. Broadcast variables allow each machine to work with a read-only variable cached on each machine. Spark attempts to distribute broadcast variables using efficient algorithms. As an example, broadcast variables can be used to give every node a copy of a large dataset efficiently.

The other shared variables are accumulators. These are used for counters in sums that works well in parallel. These variables can only be added through an associated operation. Only the driver can read the accumulators value, not the tasks. The tasks can only add to it. Spark supports numeric types but programmers can add support for new types.

CONFIGURATION , MONITORIG AND TUNING

There are three main locations for Spark configuration:

  1. You have the Spark properties, where the application parameters can be set using the SparkConf object or through Java system properties.
  2. Then you have the environment variables, which can be used to set per machine settings such as IP address. This is done through the conf/spark-env.sh script on each node.
  3. You also have your logging properties, which can be configured through log4j.properties. You can choose to override the default configuration directory, which is currently under the SPARK_HOME/conf directory. Set the SPARK_CONF_DIR environment variable and provide your custom configuration files under that directory.

There are three ways to monitor Spark applications. The first way is the Web UI. The default port is 4040.

  1. The information on this UI is available for the duration of the application. To see the data sometime later, set the spark.eventLog.enabled to true prior to beginning the application. The information will then be persisted to storage as well.
  2. Metrics is another way to monitor Spark applications. The metric system is based on the Coda Hale Metrics Library. You can customize it so that it reports to a variety of sinks such as CSV. You can configure the metrics system in the metrics.properties file under the conf directory.
  3. At last, you can likewise utilize outer instrumentations to monitor Spark. Gangalia is used to view overall cluster utilization and resource bottlenecks. Various OS profiling tools and JVM utilities can likewise be utilized for observing Spark.

Spark programs can be bottlenecked by any resource in the cluster. Due to Spark’s nature of the in-memory computations, data serialization and memory tuning are two areas that will improve performance.

Data serialization is crucial for network performance and to reduce memory use. It is often the first thing you should look at when tuning Spark applications. Spark provides two serialization libraries.

  1. Java serialization provides a lot more flexibility, but it is quiet slow and leads to large serialized objects. This is the default library that Spark uses to serialize objects.

2 . Kyro serialization is much quicker than Java, but does not support all Serializable types. It would require you to register these types in advance for best performance. To use Kyro serialization, you can set it using the SparkConf object.

With memory tuning, you have to consider three things. The amount of memory used by the objects (whether or not you want the entire object to fit in memory). The cost of accessing those objects and the overhead garbage collection.

--

--

shorya sharma
shorya sharma

Written by shorya sharma

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science

No responses yet