Friday 17th May 2024
Ho Chi Minh, Vietnam

1. The need of data unification

Data unification refers to the process of combining and merging data from multiple sources into a single, unified view. This process involves identifying and resolving inconsistencies and errors in the data and mapping and transforming data from different sources to ensure that it is consistent and usable. Data unification is a critical process of CRM, ERP, and CDP systems.

Imagine an organization, which would process customer information from multiple sources to generate insights (or customer 360-degree view), which would help their customers to take better business decisions and act on them, for example:

  • Contact data source: store customer information on an on-premise database.
  • Customer activity data source: store customers’ behaviors and activities in an e-commerce platform.
  • Payment data source: store customers’ purchase history and sale funnel status in the Salesforce CRM platform.

Data unification aims to create a single source of truth for an organization’s data, which can improve data quality, enable better decision-making, and reduce the time and effort needed to access and analyze data.

Data unification is definitely a Big Data task. There are a lot of challenges you will have to deal with in order to implement a unified data platform. For example:

  • When you deal with customer data, you must ensure security and compliance.
  • When calculating the total amount that your customer has purchased, you must query your purchase transaction stored in a MySQL table containing more than 100 million records.
  • The user probably does not have the time to wait for the computation to finish, they even prefer real-time data unification.

These challenges are difficult to handle using usual programming techniques such as creating a Java service to query data from multiple data sources and unify them via application logic. That is why we should choose a unique platform like Apache Spark that supports integration with a large number of systems and has the power to process large volumes of data so you can unify a massive amount of business data. Let dive dig into Spark in the next chapter.

2. Spark introduction

2.1. What is Apache Spark?

The major problem of Big data is about dealing with large computations and large volumes of data. One particularly challenging area is data processing. Single machines do not have enough power and resources to perform computations on huge amounts of information (or the user probably does not have the time to wait for the computation to finish).

A cluster (or group, of computers) pools the resources of many machines together, allowing us
to use all the cumulative resources as if they were a single computer. Now, a group of machines alone
is not powerful, you need a framework to coordinate work across them. Spark does just that,
managing and coordinating the execution of tasks on data across a cluster of computers (or nodes).

Apache Spark is a unified computing engine and a set of libraries for parallel data processing on
computer clusters. Spark supports multiple widely used programming languages (Python, Java, Scala, and R), and includes libraries for diverse tasks ranging from SQL to streaming and machine learning.

Spark is a huge part of companies solving massive-scale data challenges, from
technology companies like Uber and Netflix using Spark’s streaming and machine-learning tools, to
institutions like NASA, CERN, and the Broad Institute of MIT and Harvard applying Spark to
scientific data analysis

2.2. Spark concepts

2.2.1. RDD

Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark (or the backbone of PySpark). They are immutable Distributed collections of objects of any type and can reside on multiple nodes which Spark uses to achieve Faster and more efficient MapReduce operations.

2.2.2. DataFrame

Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These different abstractions all represent distributed collections of data. The easiest and most efficient are DataFrames, which are available in all languages.

A DataFrame is the most common Structured API and represents a table of data with rows and
columns
. The list that defines the columns and the types within those columns is called the schema.
You can think of a DataFrame as a spreadsheet with named columns.

The image below illustrates the fundamental difference: a spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers. The reason for putting the data on more than one computer should be intuitive: either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine.

2.2.3. Spark Session

Since we control your Spark Application through a driver process called the SparkSession. The SparkSession instance is the way Spark executes user-defined manipulations across the cluster. There is a one-to-one correspondence between a SparkSession and a Spark Application. In Python, the variable is available as Spark when you start the console as below – a first Spark example locally.

from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setMaster("spark://localhost:7077").setAppName("Hello").set(
                "spark.dynamicAllocation.enabled", False).set(
                "spark.driver.maxResultSize", 0)

my_spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

range = my_spark.range(1000).toDF("number")

You just ran your first Spark code! We created a DataFrame with one column containing 1,000 rows with values from 0 to 999. This range of numbers represents a distributed collection. When run on a cluster, each part of this range of numbers exists on a different executor. This is a Spark DataFrame.

2.2.4. Overview of Structured API Execution

How your code is actually executed across a Spark cluster?

To execute code, we must write code. This code is then submitted to Spark either through the console
or via a submitted job. This code then passes through the Catalyst Optimizer, which decides how the
code should be executed (physical plan) and lays out a plan for doing so before, finally, the code is run and the result is returned to the user.

3. Data unification using PySpark

3.1. Introduction

We will create a sample project to demonstrate how to implement data unification using Spark. This project simulates a Customer Data Platform with three databases:

  • nht_contact: a MongoDB database contains two collections:
    • contact: store customer information including firstName, lastName, email, phone, gender,…
    • contact_activity: score customer activities (click an email, access a website, send a message,…)
  • nht_payment: a MySQL database contains two tables
    • product: store product data
    • transaction: store transactions between customers and products.
  • nht_unified_contact: a MongoDB database to store unified contact

The requirement of this project is to implement a “unify service” which creates a single customer 360-degree view unified from the above nht_contact and nht_payment data sources. This service requires a “primary_key” parameter, a set of fields in nht_contact.contact collection to identify which customer attributes will be used to match the data, when merging the data the latest data will be prioritized.

Below is a data unification example with 3 contacts and their activities, and payment data. After unifying with primate_key = [email, first_name], two unified contacts will be created:

PySpark will be used to interact with Spark cluster via Spark MongoDB Connector so you need to understand the basic fundamentals of Python programming API. Due to the complexity of building up Spark locally, we will use Docker to build up the entire local environment, so you need to install Docker on your local machine and understand the basic concept of Docker containerization.

The high-level architecture of this system is as below.

3.2. Build up Spark locally with Docker

We will use Docker Compose to build the local environment including:

  • A Spark cluster with two workers
  • nht_contact Database (MongoDB)
  • nht_payment Database (MySQL)
  • unified_contact Database (MongoDB)

The unify service also needs to be containerized to Docker Image in order to run the entire system in Docker. Below is the content of the Dockerfile. Note that Spark requires Java runtime environment so we need to install Java as well.

FROM python:3.8.2-buster

# Install OpenJDK-11
RUN apt-get update && \
    apt-get install -y openjdk-11-jre-headless && \
    apt-get clean;

# Install PYTHON requirements
WORKDIR /srv
ADD ./requirements.txt /srv/requirements.txt
RUN pip install -r requirements.txt
ADD . /srv
CMD python /srv/app.py

We can containerize unify service to Dockeri mage with the below command.

docker build -t unify-service .

The docker-compose file is as below. We use the “depend_on” attribute to make sure the unify-service image runs after the other services are ready. We also create sample data to demonstrate the unification including:

  • nht_contact.contact: 100 documents
  • nht_contact.contact_acitivy: 1000 documents
  • nht_payment.product: 18 records
  • nht_payment.transaction: 1000 transactions
version: '3'
services:
  spark-master:
    image: bde2020/spark-master:3.3.0-hadoop3.3
    container_name: spark-master
    ports:
      - "8083:8080"
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
  spark-worker-1:
    image: bde2020/spark-worker:3.3.0-hadoop3.3
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8084:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  spark-worker-2:
    image: bde2020/spark-worker:3.3.0-hadoop3.3
    container_name: spark-worker-2
    depends_on:
      - spark-master
    ports:
      - "8085:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  contact-mongo-db:
    ports:
      - '27017:27017'
    container_name: contact-mongo-db
    restart: always
    logging:
      options:
        max-size: 1g
    environment:
       MONGO_INITDB_ROOT_USERNAME: nht
       MONGO_INITDB_ROOT_PASSWORD: abc123
       MONGO_INITDB_DATABASE: nht_contact
    image: mongo:6.0-focal
    volumes:
      - ./init_mongo_contact/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
    healthcheck:
      test: "exit 0"
  unify-mongo-db:
    ports:
      - '27018:27017'
    container_name: unify-mongo-db
    restart: always
    logging:
      options:
        max-size: 1g
    environment:
      MONGO_INITDB_ROOT_USERNAME: nht
      MONGO_INITDB_ROOT_PASSWORD: abc123
      MONGO_INITDB_DATABASE: nht_unified_contact
    image: mongo:6.0-focal
    volumes:
      - ./init_mongo_unify/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
    healthcheck:
      test: "exit 0"
  payment-mysql-db:
    container_name: payment-mysql-db
    platform: linux/x86_64
    image: mysql:5.7
    restart: always
    environment:
      MYSQL_USER: 'nht'
      MYSQL_PASSWORD: 'abc123'
      MYSQL_ROOT_PASSWORD: 'abc123'
    ports:
      - '3316:3306'
    expose:
      - '3306'
    volumes:
      - ./init_mysql_payment/:/docker-entrypoint-initdb.d
    healthcheck:
      test: "exit 0"
  unify-service:
    image: unify
    depends_on:
      spark-worker-1:
        condition: service_started
      spark-worker-2:
        condition: service_started
      contact-mongo-db:
        condition: service_healthy
      unify-mongo-db:
        condition: service_healthy
      payment-mysql-db:
        condition: service_healthy

3.3. Project structure

The project structure is quite simple and easy to understand. app.py is a Python application that implements the whole logic of the unification process and will be dived deeper into in the next chapters.

3.4. Load contact data to Spark data frame

We will create the run_unify method to run the unification with a parameter is primary_key which is a list of contact fields we want to match.

We will create Spark Session with the Spark Master built up via Docker above. This Spark Session will connect to nht_contact databases as the input database and nht_unfied_contact as the output database using Spark Mongo Connector. Finally, we will load all documents in the “contact” collection into the Spark data frame to kick off the unification process.

 def run_unify(primary_key):
    conf = SparkConf().setMaster("spark://spark-master:7077").setAppName("Unify") \
        .set("spark.dynamicAllocation.enabled", False).set("spark.driver.maxResultSize", "3g").set(
        "spark.driver.memoryOverhead", "3g").set("spark.executor.memory", "3g")
    # initiate Spark session with input is contact database and output is unified contact database
    spark = SparkSession.builder \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .config("spark.mongodb.input.uri",
                "mongodb://nht:abc123@contact-mongo-db:27017/nht_contact.contact?connectTimeoutMS=10000&authSource=admin&authMechanism=SCRAM-SHA-256") \
        .config("spark.mongodb.output.uri",
                "mongodb://nht:abc123@unify-mongo-db:27017/nht_unified_contact.unified_contact?connectTimeoutMS=10000&authSource=admin&authMechanism=SCRAM-SHA-256") \
        .config(conf=conf) \
        .getOrCreate()

    # load contact data to data frame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=contact_schema)
...

3.5. Unify contact data

After loading contact data to Spark data frame, we can use the groupby function to group these contacts by primary_key (1). Then we will load contact_activities data to a data frame because this collection is in the same Spark input database (2). Payment transaction data will be loaded via Python MySQL connector (3). Finally, we will perform the merge logic to ensure the latest data will be prioritized as well as combine contact_activies data, and payment data into the newly created unified contact (4).

# 1. group contact data frame by primary key
columns = [func.collect_list(x).alias(x) for x in df.columns]
raw_unified_df = df.groupby(primary_key).agg(*columns)
unified_contacts = []

# 2. load contact activities to data frame
contact_activities = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("collection", "contact_activity") \
        .load()

# 3. load transactions from payment MySQL database
transactions = get_transactions();

# 4. merge data to create unified contact
for row in raw_unified_df.rdd.collect():
     unified_contact = {}
     for attr, columns in row.__dict__.items():
         for column in columns:
             if column == "_id":
                 column_values = []
                 for items in row[column]:
                     if items[0] is not None:
                         column_values.append(items[0])
                 # store id of contacts used to create this unified contact
                 unified_contact["contact_ids"] = column_values
             else:
                 # merge data using latest value
                 unified_contact[column] = row[column][-1]

    enrich_contact_activities(contact_activities, unified_contact)
    enrich_contact_payment(transactions, unified_contact)
    unified_contacts.append(unified_contact)

3.6. Store unified contacts

The newly created unified contacts need to be converted to a data frame in order to utilize Spark’s powerful. We use the repartition method to distribute the workload of storing contact data across Spark workers which is a very important step.

# store unified contact to unified_contact database
unified_contacts_df = spark.createDataFrame(unified_contacts,   unified_contact_schema)
unified_contacts_df = unified_contacts_df.repartition(100)
unified_contacts_df.write.format("mongo").mode("append").save()
spark.stop()

4. Conclusion

We’ve walked through the fundamental concepts of data unification as well as how data unification delivers value to the business side by creating a unified view of data from multiple sources. We’ve also looked at Spark’s introduction including Spark’s basic components and APIs to understand how Spark’s power can enable us to implement the data unification process.

Most noteworthy, we’ve implemented a comprehensive project to implement a simple unify_service which can be run in a container environment using Docker. It is always not an easy task due to the complexity of setting up Spark, especially building up Spark in the CI/CD pipeline which is a very interesting topic to dive deeper into. Another state-of-the-art approach is using Spark in the cloud computing platform to reduce the development effort such as AWS EMR. The demo project can be found on GitHub.

Leave a Reply

Your email address will not be published. Required fields are marked *

Back To Top