1. The need of data unification
Data unification refers to the process of combining and merging data from multiple sources into a single, unified view. This process involves identifying and resolving inconsistencies and errors in the data and mapping and transforming data from different sources to ensure that it is consistent and usable. Data unification is a critical process of CRM, ERP, and CDP systems.
Imagine an organization, which would process customer information from multiple sources to generate insights (or customer 360-degree view), which would help their customers to take better business decisions and act on them, for example:
- Contact data source: store customer information on an on-premise database.
- Customer activity data source: store customers’ behaviors and activities in an e-commerce platform.
- Payment data source: store customers’ purchase history and sale funnel status in the Salesforce CRM platform.
Data unification aims to create a single source of truth for an organization’s data, which can improve data quality, enable better decision-making, and reduce the time and effort needed to access and analyze data.
Data unification is definitely a Big Data task. There are a lot of challenges you will have to deal with in order to implement a unified data platform. For example:
- When you deal with customer data, you must ensure security and compliance.
- When calculating the total amount that your customer has purchased, you must query your purchase transaction stored in a MySQL table containing more than 100 million records.
- The user probably does not have the time to wait for the computation to finish, they even prefer real-time data unification.
These challenges are difficult to handle using usual programming techniques such as creating a Java service to query data from multiple data sources and unify them via application logic. That is why we should choose a unique platform like Apache Spark that supports integration with a large number of systems and has the power to process large volumes of data so you can unify a massive amount of business data. Let dive dig into Spark in the next chapter.
2. Spark introduction
2.1. What is Apache Spark?
The major problem of Big data is about dealing with large computations and large volumes of data. One particularly challenging area is data processing. Single machines do not have enough power and resources to perform computations on huge amounts of information (or the user probably does not have the time to wait for the computation to finish).
A cluster (or group, of computers) pools the resources of many machines together, allowing us
to use all the cumulative resources as if they were a single computer. Now, a group of machines alone
is not powerful, you need a framework to coordinate work across them. Spark does just that,
managing and coordinating the execution of tasks on data across a cluster of computers (or nodes).
Apache Spark is a unified computing engine and a set of libraries for parallel data processing on
computer clusters. Spark supports multiple widely used programming languages (Python, Java, Scala, and R), and includes libraries for diverse tasks ranging from SQL to streaming and machine learning.
Spark is a huge part of companies solving massive-scale data challenges, from
technology companies like Uber and Netflix using Spark’s streaming and machine-learning tools, to
institutions like NASA, CERN, and the Broad Institute of MIT and Harvard applying Spark to
scientific data analysis
2.2. Spark concepts
2.2.1. RDD
Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark (or the backbone of PySpark). They are immutable Distributed collections of objects of any type and can reside on multiple nodes which Spark uses to achieve Faster and more efficient MapReduce operations.
2.2.2. DataFrame
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These different abstractions all represent distributed collections of data. The easiest and most efficient are DataFrames, which are available in all languages.
A DataFrame is the most common Structured API and represents a table of data with rows and
columns. The list that defines the columns and the types within those columns is called the schema.
You can think of a DataFrame as a spreadsheet with named columns.
The image below illustrates the fundamental difference: a spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers. The reason for putting the data on more than one computer should be intuitive: either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine.
2.2.3. Spark Session
Since we control your Spark Application through a driver process called the SparkSession. The SparkSession instance is the way Spark executes user-defined manipulations across the cluster. There is a one-to-one correspondence between a SparkSession and a Spark Application. In Python, the variable is available as Spark when you start the console as below – a first Spark example locally.
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setMaster("spark://localhost:7077").setAppName("Hello").set(
"spark.dynamicAllocation.enabled", False).set(
"spark.driver.maxResultSize", 0)
my_spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
range = my_spark.range(1000).toDF("number")
2.2.4. Overview of Structured API Execution
How your code is actually executed across a Spark cluster?
To execute code, we must write code. This code is then submitted to Spark either through the console
or via a submitted job. This code then passes through the Catalyst Optimizer, which decides how the
code should be executed (physical plan) and lays out a plan for doing so before, finally, the code is run and the result is returned to the user.
3. Data unification using PySpark
3.1. Introduction
We will create a sample project to demonstrate how to implement data unification using Spark. This project simulates a Customer Data Platform with three databases:
- nht_contact: a MongoDB database contains two collections:
- contact: store customer information including firstName, lastName, email, phone, gender,…
- contact_activity: score customer activities (click an email, access a website, send a message,…)
- nht_payment: a MySQL database contains two tables
- product: store product data
- transaction: store transactions between customers and products.
- nht_unified_contact: a MongoDB database to store unified contact
The requirement of this project is to implement a “unify service” which creates a single customer 360-degree view unified from the above nht_contact and nht_payment data sources. This service requires a “primary_key” parameter, a set of fields in nht_contact.contact collection to identify which customer attributes will be used to match the data, when merging the data the latest data will be prioritized.
Below is a data unification example with 3 contacts and their activities, and payment data. After unifying with primate_key = [email, first_name], two unified contacts will be created:
PySpark will be used to interact with Spark cluster via Spark MongoDB Connector so you need to understand the basic fundamentals of Python programming API. Due to the complexity of building up Spark locally, we will use Docker to build up the entire local environment, so you need to install Docker on your local machine and understand the basic concept of Docker containerization.
The high-level architecture of this system is as below.
3.2. Build up Spark locally with Docker
We will use Docker Compose to build the local environment including:
- A Spark cluster with two workers
- nht_contact Database (MongoDB)
- nht_payment Database (MySQL)
- unified_contact Database (MongoDB)
The unify service also needs to be containerized to Docker Image in order to run the entire system in Docker. Below is the content of the Dockerfile. Note that Spark requires Java runtime environment so we need to install Java as well.
FROM python:3.8.2-buster
# Install OpenJDK-11
RUN apt-get update && \
apt-get install -y openjdk-11-jre-headless && \
apt-get clean;
# Install PYTHON requirements
WORKDIR /srv
ADD ./requirements.txt /srv/requirements.txt
RUN pip install -r requirements.txt
ADD . /srv
CMD python /srv/app.py
We can containerize unify service to Dockeri mage with the below command.
docker build -t unify-service .
The docker-compose file is as below. We use the “depend_on” attribute to make sure the unify-service image runs after the other services are ready. We also create sample data to demonstrate the unification including:
- nht_contact.contact: 100 documents
- nht_contact.contact_acitivy: 1000 documents
- nht_payment.product: 18 records
- nht_payment.transaction: 1000 transactions
version: '3'
services:
spark-master:
image: bde2020/spark-master:3.3.0-hadoop3.3
container_name: spark-master
ports:
- "8083:8080"
- "7077:7077"
environment:
- INIT_DAEMON_STEP=setup_spark
spark-worker-1:
image: bde2020/spark-worker:3.3.0-hadoop3.3
container_name: spark-worker-1
depends_on:
- spark-master
ports:
- "8084:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
spark-worker-2:
image: bde2020/spark-worker:3.3.0-hadoop3.3
container_name: spark-worker-2
depends_on:
- spark-master
ports:
- "8085:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
contact-mongo-db:
ports:
- '27017:27017'
container_name: contact-mongo-db
restart: always
logging:
options:
max-size: 1g
environment:
MONGO_INITDB_ROOT_USERNAME: nht
MONGO_INITDB_ROOT_PASSWORD: abc123
MONGO_INITDB_DATABASE: nht_contact
image: mongo:6.0-focal
volumes:
- ./init_mongo_contact/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
healthcheck:
test: "exit 0"
unify-mongo-db:
ports:
- '27018:27017'
container_name: unify-mongo-db
restart: always
logging:
options:
max-size: 1g
environment:
MONGO_INITDB_ROOT_USERNAME: nht
MONGO_INITDB_ROOT_PASSWORD: abc123
MONGO_INITDB_DATABASE: nht_unified_contact
image: mongo:6.0-focal
volumes:
- ./init_mongo_unify/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
healthcheck:
test: "exit 0"
payment-mysql-db:
container_name: payment-mysql-db
platform: linux/x86_64
image: mysql:5.7
restart: always
environment:
MYSQL_USER: 'nht'
MYSQL_PASSWORD: 'abc123'
MYSQL_ROOT_PASSWORD: 'abc123'
ports:
- '3316:3306'
expose:
- '3306'
volumes:
- ./init_mysql_payment/:/docker-entrypoint-initdb.d
healthcheck:
test: "exit 0"
unify-service:
image: unify
depends_on:
spark-worker-1:
condition: service_started
spark-worker-2:
condition: service_started
contact-mongo-db:
condition: service_healthy
unify-mongo-db:
condition: service_healthy
payment-mysql-db:
condition: service_healthy
3.3. Project structure
The project structure is quite simple and easy to understand. app.py is a Python application that implements the whole logic of the unification process and will be dived deeper into in the next chapters.
3.4. Load contact data to Spark data frame
We will create the run_unify method to run the unification with a parameter is primary_key which is a list of contact fields we want to match.
We will create Spark Session with the Spark Master built up via Docker above. This Spark Session will connect to nht_contact databases as the input database and nht_unfied_contact as the output database using Spark Mongo Connector. Finally, we will load all documents in the “contact” collection into the Spark data frame to kick off the unification process.
def run_unify(primary_key):
conf = SparkConf().setMaster("spark://spark-master:7077").setAppName("Unify") \
.set("spark.dynamicAllocation.enabled", False).set("spark.driver.maxResultSize", "3g").set(
"spark.driver.memoryOverhead", "3g").set("spark.executor.memory", "3g")
# initiate Spark session with input is contact database and output is unified contact database
spark = SparkSession.builder \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.config("spark.mongodb.input.uri",
"mongodb://nht:abc123@contact-mongo-db:27017/nht_contact.contact?connectTimeoutMS=10000&authSource=admin&authMechanism=SCRAM-SHA-256") \
.config("spark.mongodb.output.uri",
"mongodb://nht:abc123@unify-mongo-db:27017/nht_unified_contact.unified_contact?connectTimeoutMS=10000&authSource=admin&authMechanism=SCRAM-SHA-256") \
.config(conf=conf) \
.getOrCreate()
# load contact data to data frame
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=contact_schema)
...
3.5. Unify contact data
After loading contact data to Spark data frame, we can use the groupby function to group these contacts by primary_key (1). Then we will load contact_activities data to a data frame because this collection is in the same Spark input database (2). Payment transaction data will be loaded via Python MySQL connector (3). Finally, we will perform the merge logic to ensure the latest data will be prioritized as well as combine contact_activies data, and payment data into the newly created unified contact (4).
# 1. group contact data frame by primary key
columns = [func.collect_list(x).alias(x) for x in df.columns]
raw_unified_df = df.groupby(primary_key).agg(*columns)
unified_contacts = []
# 2. load contact activities to data frame
contact_activities = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("collection", "contact_activity") \
.load()
# 3. load transactions from payment MySQL database
transactions = get_transactions();
# 4. merge data to create unified contact
for row in raw_unified_df.rdd.collect():
unified_contact = {}
for attr, columns in row.__dict__.items():
for column in columns:
if column == "_id":
column_values = []
for items in row[column]:
if items[0] is not None:
column_values.append(items[0])
# store id of contacts used to create this unified contact
unified_contact["contact_ids"] = column_values
else:
# merge data using latest value
unified_contact[column] = row[column][-1]
enrich_contact_activities(contact_activities, unified_contact)
enrich_contact_payment(transactions, unified_contact)
unified_contacts.append(unified_contact)
3.6. Store unified contacts
The newly created unified contacts need to be converted to a data frame in order to utilize Spark’s powerful. We use the repartition method to distribute the workload of storing contact data across Spark workers which is a very important step.
# store unified contact to unified_contact database
unified_contacts_df = spark.createDataFrame(unified_contacts, unified_contact_schema)
unified_contacts_df = unified_contacts_df.repartition(100)
unified_contacts_df.write.format("mongo").mode("append").save()
spark.stop()
4. Conclusion
We’ve walked through the fundamental concepts of data unification as well as how data unification delivers value to the business side by creating a unified view of data from multiple sources. We’ve also looked at Spark’s introduction including Spark’s basic components and APIs to understand how Spark’s power can enable us to implement the data unification process.
Most noteworthy, we’ve implemented a comprehensive project to implement a simple unify_service which can be run in a container environment using Docker. It is always not an easy task due to the complexity of setting up Spark, especially building up Spark in the CI/CD pipeline which is a very interesting topic to dive deeper into. Another state-of-the-art approach is using Spark in the cloud computing platform to reduce the development effort such as AWS EMR. The demo project can be found on GitHub.