Change Data Capture as a Gateway to the Big Data and Streaming Platforms
This is my first article, so I want to quickly introduce myself. I have been working as a Data Engineer at Bukalapak since December 2017. Back in the days, I was so interested in the growth of data as companies today are re-architecture to become fully digital. Therefore, I was so motivated to learn more about Big Data platforms and the real implementations in a company. In this article, I would like to share my experience working with Big Data platforms, and I want to start my journey with the first gateway of the Big Data platforms called CDC (Change Data Capture) and my development experience with it.
What is Change Data Capture (CDC)?
First of all, why would you need something like CDC in the first place? Let’s say your company keeps growing, and your active users also are growing. As a data-driven company, you are collecting data that give actionable insights about what your service is doing. Then, you analyze this data to find out how you could grow your business further. Well, the problem is that such an analysis could involve long-running queries, and you possibly will not run that such queries against your master database on production because it would ruin your application performance.
Change Data Capture (CDC) idea comes to fix this problem. CDC is a process of capturing changes made at the data source and applying them throughout the Big Data platforms. Traditionally, you also could achieve capturing data changes in various ways, and the most common approaches are below:
1. Batch load or trigger function
Most simply, we could use a query or trigger for getting data changes periodically for every table in your database (Let’s say every 10 minutes or less). Like the following examples.
SELECT column1, column2, ...
WHERE updated_at ≥ “2020-01-01T00:00:00” AND
updated_at < “2020-01-01T00:10:00”
2. Manually send the data changes from your service to the Big Data platforms
These means services owned by the different teams have to send the data into the Big Data platforms. However, it will often be tough to get other teams to prioritize working with you.
The two approaches above need a lot of dependencies with the other teams and consume a lot of processing power, which means it is not efficient. At the heart of any database sits the transaction log. It’s called different things in different technologies like a binlog, an oplog, a write-ahead-log (WAL) — but the fundamental concept remains the same. The transaction log is an immutable, sequential log of every transaction performed by every user and application of the database — inserts, updates, deletes, commits, and rollbacks.
CDC connects to the existing databases, collects these data changes from the transaction logs on disk, and lets you stream these data changes into the Big Data platforms without interrupting the other team's services.
What is the impact on business metrics?
CDC reduces the time required for and resource costs of data warehousing while enabling continuous data integration. They could scale easier and efficiently carry out high-volume data transfers in real-time without disrupting operational activities. With CDC, we could empower any stream applications. An event streaming platform lets you rapidly tackle an appealing use case with positive impacts on the business.
Our team experience working with CDC
Our company has used Apache Kafka for many years so that our team re-architecture our Big Data ecosystem for using it. Apache Kafka is one of the powerful platforms that enable real-time, scalable & event-driven integration and processing across the enterprise. Our company started using Debezium as our CDC solution at that time (2017). There are several open sources to provide CDC solution, and one of them is Debezium. In figure 1, you could see how we use the CDC in our Big Data ecosystem.
In the architecture above, Debezium connectors for MySQL & MongoDB are deployed to capture data changes to these two types of databases into the Kafka. We also used Kafka Connect. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. Debezium base code uses Kafka Connect to run the connectors.
What happened to our platforms after using Debezium?
After our team implemented this architecture, our team could develop an event streaming platform. In this article, I want to focus more on the CDC, so I will not explain what we have done on our streaming platform. But instead, I will create another article to share about our experience deal with the streaming platform. The streaming platform enables us to stream the events in real-time and build a dedicated database for Business Intelligence (BI) purposes (also called data warehousing). With this data warehouse, Data Scientists were able to analyze & identified various data. Eventually, we could encourage our company to start new projects related to data analytics. Starting from that, we started the kick-off the new projects such as the AI project using Machine Learning, recommendation systems, fraud detection, etc. Through these projects, we were able to make positive impacts on our business.
As our company keeps growing, more services created, more analytic demands also were needed. Therefore, our data also is growing quite rapidly each day. Back in the day, our Debezium connectors were failing almost every week or even day. Every time the connectors failed, we need to troubleshoot it manually. Therefore, the time we spent on operational activities was increasing, which mean that was not efficient anymore. Following are the problems that we found when we were using Debezium.
- The Debezium process for snapshotting data from a table that has big data took a long time as its data grows. The process might take 2 or 3 days to finish a large table.
- The connector kept restarting when we were deploying a new connector or were changing the connector config. The problem happened due to the Kafka Connect mechanism. Kafka Connect would attempt to rebalance the cluster. The cluster will stop all tasks, recompute to run all jobs, and then start everything again. This mechanism happened in Kafka Connect version 2.3 lower.
- The connector failed due to a table schema mismatch. This problem occurred when the other team was executing changes to the table, such as the
- There were problems with the serializable process inconsistency. Ex. the boolean type could be
0. This problem occurred when we changed the DDL parser mode config.
- There were problems with the checkpoint mechanism. Somehow, we often found data loss when the connector was failing or was restarting. Data loss is a big problem when we talk about Big Data.
The version of Debezium that we used at that time was too problematic in our system, and we could not upgrade it due to our MySQL server version. The next Debezium version uses an unsupported MySQL connector version with our MySQL server. Therefore, in 2019, we discussed and then decided to develop our CDC project.
Built in-house Change Data Capture (CDC) to replace Debezium
Our team started kick-off our CDC project in Q4 2019 called Gargantua. We designed the project more robust & independently so that we no need Kafka Connect anymore, but instead, we could make our Kafka ingestion separately. Besides, we also explicitly specify how to save checkpoints into another target that you have. With this feature, we could have the capability to rewind a database transaction log whenever your team needed it. Gargantua has a functionality that would execute the Spark script to do a full snapshot. With the Spark cluster, the snapshot process could be done faster depending on the cluster size. Gargantua also is designed like any other open-source, which means Gargantua is not an application or service but more like a library (Maybe we would share the project publicity in the future). Therefore, we may need to create another project to serve the CDC app as a service. At our company, we use the Kubernetes cluster as our service platform. In figure 2, you could see how we use Gargantua in our Big Data ecosystem.
In the new architecture above, we used the Cassandra database to store the transaction log checkpoints. Why did we decide to use Cassandra? Based on the CAP theorem, we no need data consistency, but instead, we more need availability and partition tolerance. Therefore, Cassandra is what we need in our case. By default, Gargantua will send the checkpoint info periodically depends on the schedule config (most likely 500 ms or less), so we could tolerate the missing checkpoint at that time if the service failed to store the checkpoint info in the Cassandra database. If you interest to know more detail about Gargantua or how Gargantua works, you may look to this MySQL Binary Log Connector works in the Change Data Capture.
How do we migrate our CDC service from Debezium to Gargantua?
In the migration perception, we may know that we want to keep all services working properly and integrated as is in the previous architecture. Therefore, we have to minimize changes and ensure the new process has the same result and even could give better functionalities. To achieve these goals, our team proposed the following solutions.
- For the sake of migration simplicity, we decided to use the Debezium data format (Debezium output) so that we no need to make changes to our streaming application. When we developed Gargantua, we also found a new most useful value. We found a native time of transaction log both for MySQL & MongoDB. The native time is an actual time when the transaction log is written by the database. This value was not provided by Debezium at the version we used. Therefore, we added this value to the Gargantua output.
- We learned more detail about all kinds of events that are written in the transaction log of the database (binlog, oplog, etc.). We have to study those events so that we could know how to handle those events correctly and even provide better valuable metrics.
- We wrote a lot of unit tests in the code. Not only that, but we also had quite extensive integration tests in the staging environment. That boosted our confidence in the reliability of the service.
- We developed better monitoring and alerting to our platforms so that we could have better feedback from Gargantua. When we used Debezium, it was so hard to integrate the Debezium metrics into our monitoring system and also hard to create custom metrics that we needed.
- We enriched our CDC service with CI/CD pipeline so that we could improve our development and deployment more efficiently without taking a lot of effort whenever we do it on production.
With our Gargantua solution, we can eliminate all problems that we have found in the Debezium, and the most important thing is that we have better control in CDC development. Following are the key benefit points that show more robustness and reliability for using Gargantua.
- The snapshot process would only take 1 or 2 hours (depending on the cluster size) to do a full snapshot of a large table.
- Prevent data loss to our Big Data platform. It proved in our production. We have no data loss since we used Gargantua.
- Can handle any table schema changes.
- Explicitly specify the database transaction log checkpoints so that we can easily rewind the transaction logs.
- The most important of all is Gargantua has a native time of transaction log. Therefore, we no need to rely on other team's services for utilizing the
updated_atcolumn. Back in the day, before we used a native time of transaction log, we sometimes found out data irregularities in our data warehouse due to another team service updates data without updating the
updated_atcolumn. As a result, our streaming & batch application cannot process data correctly.
Thank you for reading my story all the way. If you have interests to know more about our team story, you may look to this Data Platform Transformation at Bukalapak article.