Journey of Transforming and Architecting Data Platforms using Lambda Architecture

David Christianto
12 min readJan 29, 2022

--

I have to say that I had remarkable experiences working with Data Platforms at Bukalapak. It has been two years since we established Data Platforms in GCP. There have been many extraordinary things that my colleagues and I have done during that period. Those experiences transformed my mindset and behavior significantly as a professional, and I want to share them with all of you.

Previously, I talked about how CDC works to send events to Kafka, then let’s talk about how we process the events until it ready to use by our users (end-to-end processes). Hence, I would like to share an outline of my experiences architecting our Data Platforms using Lambda architecture.

What situations happened before?

Before we talk about the current architecture, let’s see a brief situation about what happened and what challenges we faced before establishing Data Platforms in GCP.

Figure 1. On-premise Architecture

In figure 1, you can see the old architecture and how we interconnected all technologies. This architecture ran in an on-premise data center, and they were all built on common big data technologies. In other words, we had to maintain all of those technologies by ourselves.

How we built Data Lake

Data Lake is a storage repository that holds a vast amount of raw data (both structured & unstructured) in its native format. We built the data lake using Apache HBase that ran on top of Zookeeper and Apache Hadoop. In 2017, our team has still not been familiar with Spark Streaming. Therefore, we created our first streaming pipeline using a service (Kafka Client) running on a Kubernetes cluster. The streaming pipeline ingests everything from Kafka topics to HBase tables, as illustrated in Figure 2.

Figure 2. Data Ingestion Illustration

We had two types of data lake tables.

  1. Upsert table. It inserts or updates data based on the HBase row key.
  2. Append-only table. It only inserts new data and is partitioned by ingestion time (date & hour partition).

How we built Data Warehouse & Data Mart

Data Warehouse is a data management system designed to enable and support business intelligence (BI) activities, especially analytics. We created and stored all data warehouses in HDFS that ran on top of Zookeeper and Apache Hadoop. We used Apache Spark as our ETL pipelines scheduled daily/hourly and Apache Airflow as our scheduler platform.

An ETL pipeline is the set of processes used to extract data lake, transform the data into cleansed/ready-to-use data, and load that data into another source. We had two main ETL pipelines, and the other pipelines created by Data Scientists/Data Analysts are used to build data marts.

  1. Full-load ETL pipeline for creating daily updates.
  2. Incremental-load ETL pipeline for creating hourly updates.

How we built Real-time Data

As our company keeps growing, more services created, more analytic demands also were needed. In 2019, we started experimenting with the real-time table. We decided to use Apache Kudu with Apache Impala and Spark Streaming. Apache Kudu is a distributed data storage engine that makes fast analytics on fast and changing data easy. Apache Impala is a modern distributed SQL query engine.

We built and stored all-the-time data in Apache Kudu. The Apache Kudu API supports an upsert function, which is very convenient for us as Data engineers. With this solution, we helped our data stakeholders to analyze and achieve our company’s needs as a data-driven company.

To understand more about the ETL pipeline, you can see the illustration shown in Figure 3.

Figure 3. ETL Pipeline (Stream & Batch) Illustration

How we served our data to users

To serve data warehouse and data mart to our stakeholders, we created external tables in Apache Hive and connected the tables using Hive connector to PrestoDB cluster. Our PrestoDB cluster was the primary tool for our stakeholders to query all data in our Data Platforms.

Back then, we were not sure what architecture we were using. It was a bit like “Lambda architecture”, but we did not implement the Lambda architecture best practices properly. Apart from that, I have to say that this architecture helped our company grow up until this stage. As our data was growing quite rapidly every day, and then, we started facing many problems.

Infrastructure

  1. Hardware failures indeed were something that we should expect. But once they came, it was kind of hard to anticipate and be handled by our team. There was a case where we had an accident that took one month to recover all data.
  2. Scaling out was also really tough. Machine procurement would take a lot of time until it was ready to use, and obviously, require large upfront purchases.

Services

  1. As a data lake, Apache HBase dropped data quite frequently. We spent a lot of time investigating and understanding the underlying problem. However, we never managed to achieve a long-term fix for that.
  2. Data block size in HDFS was not maximized, impacting performance and storage problems. In Bukalapak, each tribe (business unit) could create their ETL in our data platforms so that businesses could move even faster without relying everything on the engineering team. Because of this, we had a lot of ETL pipelines, and it was kind of hard to manage the block size in HDFS.
  3. Slow data processing & end up with a low query success rate. We had more than 200 nodes to power our data processing in the platform, but unfortunately, it almost reached its maximum capacity. We had more complaints about how it was hard for them to finish their work with our data platform.
  4. Inadequate monitoring. As platform owners, we needed to understand everything that was happening. Without metrics that could guide us, we could not provide good governance. For example, we could not do any analysis to figure out the right approach to handle situations in which jobs were consuming most of the compute resources in a particular period.

Data Quality

  1. Data irregularities. Back then, we did not have proper data governance. There was inadequate data validation. We struggled a lot with schema and data types changing. We even found different categories of values in the same column and schema inconsistency between real-time data and data warehouse.
  2. There were a lot of ETL pipeline types and behaviors. There was no standard on how to create a good data warehouse design. In the end, they introduced a lot of new tech debts for our team.

Given all the above-mentioned bad experiences, I do not think this is about right or wrong architecture. But, it is about a reminder that we need to have a continuous improvement and evolution of what we already have. Then, let’s see what we have done using those experiences. Let’s start a brief with what Lambda Architecture is.

What is Lambda Architecture?

Lambda architecture is a way of processing massive quantities of data (“Big Data”) that provides access to batch-processing and stream-processing methods with a hybrid approach. It is a general architecture used to solve the problem of computing-arbitrary functions.

Figure 4. Lambda Architecture Overview

In figure 4, you can see three main components of Lambda Architecture. New data comes continuously as a feed to the data system. It gets fed to the batch layer and the speed layer simultaneously.

Batch Layer

This layer is responsible for managing the master data set. The master dataset is the source of truth. Even if you lose all your serving layer datasets and speed layer datasets, you could re-construct your application from the master dataset. This layer is built using a predefined schedule, usually once or twice a day. The batch layer has two main functions:

  • Manage the master dataset.
  • Pre-compute the batch views.

Serving Layer

The output from the batch layer is batch views, and the output from the speed layer is near real-time views. This layer combines both views to make it queryable by end-users in low latency on an ad-hoc query.

Speed Layer

This layer handles data that does not exist in the batch view due to the latency of the batch layer. In addition, it only deals with recent data (not all-the-time data), and to give the users a complete view of the data, we have to combine it with the batch layer, as shown in the following image.

Figure 5. Data is indexed simultaneously by both the serving layer and the speed layer (Source)

What actions did we take?

Our main goal was to solve the above problems. I agreed we had a lot of issues, but, on the contrary, we also got a lot of lessons learned from our previous experiences. The key points to improve here are simplicity, reliability, and elasticity so that our team can be more focused on doing things that could bring better value to our company. To answer this, let’s look a step back to our main problems.

Infrastructure

Cloud migration is the best answer for this problem. With cloud services, we do not need to spend time maintaining infrastructure. We could provision new machines within minutes without a bunch of upfront costs. At that time, our technology team decided to use Google Cloud Platforms (GCP). GCP offers numerous good services like Google Cloud Storage, Google Cloud Dataflow, Google Cloud Dataproc, Google BigQuery, and more. With these services, we almost no need to think about infrastructure.

Services

As you can read above, I introduced enormous open-source tools that we used in the on-premise data center. We did not want to do that again in GCP. Therefore, after several experiments, our team decided to use the following services.

  • Google Cloud Storage as a replacement for HDFS.
  • Google Cloud Dataflow is our solution for the streaming ETL pipelines. Although we could use Spark Streaming with Google Cloud Dataproc, we preferred to use Google Cloud Dataflow after considering the estimated cost.
  • Google Cloud Dataproc is our solution for the batch ETL pipelines. Although we could use Google Cloud Dataflow, we preferred to use Spark with Google Cloud Dataproc after considering the estimated cost.
  • Google BigQuery is our solution as a compute SQL engine for accessing our data warehouse and doing analytics queries.
  • Looker is our solution for data visualizations.

In addition, all those services provide so many logs that we could build a monitoring dashboard or even a report.

Data Quality

To improve data quality, our team needed to have proper data governance. Therefore, our team introduced a new service called SchemaHub integrated with DataHub to maintain all data warehouse tables metadata. DataHub is a tool that acts as a Data Catalog, as shown in figure 3 above. A Data Catalog is a collection of metadata combined with data management and search tools. In short, there are three roles of the Data Catalog.

  1. Help analysts and other data users to find the data that they need,
  2. Serve as an inventory of available data, and
  3. Provide information to evaluate fitness data for intended uses.

SchemaHub stores related information like column names, data types & data definitions, table & column descriptions, metadata columns, etc. Therefore, all our ETL pipelines needed to process any data with schema registered in the SchemaHub service.

To improve the quality of our data warehouse, our team planned to use the Star Schema design. Star schema is the basic schema among data warehouse and dimensional data mart, and it is the simplest. It includes one or more fact tables referencing any number of dimensional tables.

How does our architecture look now?

Figure 6. GCP Architecture Overview

Figure 6 shows the end-to-end overview of our architecture looks now. We changed several components with Google services in addition to our ETL pipelines implementation, as I’ve mentioned above. With this implementation, we simplified our Data Platform tools.

How Data Lake looks now

We standardized our Data Lake to use an append-only table with a date-hour partition stored in Google Cloud Storage (GCS). Hence, all data lake tables would have the same schema and definition at a higher level. We use Google Cloud Dataflow as our ETL streaming pipeline to stream all events from Kafka to GCS in near real-time.

How Data Warehouse & Data Mart looks now

We standardized our Data Warehouse and Data Mart to use Star Schema design stored in BigQuery Storage leveraging the BigQuery Storage capabilities like automatic data compaction, data retention, etc. Our ETL pipelines use Spark and run on Google Cloud Dataproc (considering our previous experiment regarding the estimated cost). Our ETL pipelines rely on schema provided by SchemaHub to create better data quality and data governance. In addition, we also store the historical data warehouse in Google Cloud Storage with the following lifecycle.

  • Delete Objects if the Objects are not updated for the last 365+ days.
  • Set to the cold line if the Objects are not updated for the last 90+ days.

How Real-time Data looks now

We standardized our real-time data to use an append-only table with a TimeStamp partition stored in BigQuery Storage. We use Google Cloud Dataflow as our ETL streaming pipeline to stream all events from Kafka to BigQuery in near real-time. The ETL streaming pipelines also rely on schema provided by SchemaHub with additional metadata columns used for deduplication data in real-time view.

As defined in the Lambda Architecture concept, we only store the recent data that are not available yet in the batch layer (as shown in figure 5 above) by creating a retention policy in the BigQuery table.

How the Serving Layer looks now

We have more than ten tribes that operate independently. We designed a decentralized data platform where each tribe would have its data platform. Hence, each tribe will store their data in their BigQuery Storage. BigQuery supports to separation between storage and computes engines. Therefore, we can have another BigQuery act as a compute engine and create views on the same BigQuery to make them queried. Then, we can connect it to others services like Looker and Redash.

Last but not least, to help our users to get complete real-time data, we provide real-time views using LeftJoin and Union.

WITH
distinct_speed_layer AS (
SELECT
<any_columns>
FROM (
SELECT
stream.*,
ROW_NUMBER() OVER (PARTITION BY <unique_id> ORDER BY <ingestion_time> DESC) AS row_number
FROM
<speed_layer_table> AS stream )
WHERE
row_number=1 )
SELECT
<any_columns>
FROM
<batch_layer_table> AS batch_layer
LEFT JOIN (
SELECT
DISTINCT <unique_id>
FROM
distinct_speed_layer ) AS speed_layer
ON
batch_layer.<unique_id> = speed_layer.<unique_id>
WHERE
speed_layer.<unique_id> IS NULL
UNION ALL
SELECT
<any_columns>
FROM
distinct_speed_layer

How the implementation of GCP services visualize in Lambda Architecture

Figure 7. Lambda Architecture in GCP

Figure 7 shows the detailed workflow when putting all of them together. You could see how the data flows and is processed. Moreover, you could differentiate which is batch, speed, and serving layer.

Conclusion

As a result, we improved a lot of things and established them in GCP. Now, our team be more confident with our Data Platforms and builds trust with our users. Following are the key points that we have achieved for the last two years.

  1. Migrate Data Platforms to GCP and eliminate the capital expense.
  2. Build high-quality and reliable data & platforms with better logging, monitoring, and alerting.
  3. Reduce and prevent data irregularities happen. We have received almost no complaints about data irregularities caused by our ETL pipelines for the last two years.
  4. Increase query success rate. Back then, we could not even measure the query success rate. But, now, we can guarantee that we can get at least 90% of query success rate during peak hours.
  5. We can provide at least 99.9% SLA of our data readiness. In other words, we can guarantee that we can provide the correct data within 30 days of the month (43 minutes of downtime monthly).

Lastly, I would like to thank all my managers, colleagues, ex-managers, and ex-colleagues who have worked so hard together over the past two years. With all of you, we built a lot of remarkable and magnificent experiences together.

I still can see room to improve there. I hope we can make it better in the future as our company grows and is listed in IPO last year (2021). Thank you for reading my article all the way.

--

--

David Christianto
David Christianto

Written by David Christianto

Data Engineer, Mentor, and Data Ingestion Expertise | T-Shaped skills @ Bukalapak — Helping company grow their businesses & improve cost efficiency

Responses (3)