Data reconciliation in Distributed Systems
As a tech lead, I’ve faced numerous challenges in maintaining data consistency across distributed systems. One particularly tricky issue we encountered recently was keeping our microservices in sync with our monolithic database. In this article, I’ll share our experience and the solution we implemented, including some code snippets in Go.
The Problem: Data Drift in Distributed Systems
Our architecture consisted of a primary monolithic service with database and several microservices that replicated specific data (tenants, users, groups, and user-group associations) to their own databases through Kafka. This setup worked well most of the time, but we occasionally faced data inconsistencies due to various factors:
- Bugs in the replication code
- Restoration of the monolith database from backups
For example, a user could be created and replicated to a service, but if the monolith’s database was restored from a backup where that user didn’t exist, we’d end up with a phantom user in our service database.
The Solution: Introducing Reconciliation Mode
To address these inconsistencies, we introduced a reconciliation mode for our services. When deployed with this mode enabled, a service would:
- Temporarily pause incoming data from Kafka
- Fetch the latest data from the monolith via API
- Compare this data with its own database
- Make necessary adjustments to bring its data in line with the source of truth
It could also be done by directly accessing monolith’s DB. But while direct database access might seem faster, it’s generally considered a bad practice for several reasons:
- Coupling: Direct database access creates tight coupling between services and the database schema, it will have ways to mess up the data. Read-only access can partly solve that, but still dangerous.
- Performance: Uncontrolled queries from multiple services could negatively impact database performance. API calls can do this too, but at least if monolith is scalable, this is more manageable.
- Consistency: The API can enforce business logic and data validation that might be bypassed with direct database access. If API has some computed fields, would need to re-implement logic on receiver side otherwise.
The Reconciliation Process
Our reconciliation process needed to handle several scenarios for each entity type:
- Entities existing in the monolith but missing in the service
- Entities missing in the monolith but present in the service
- Entities existing in both places but with different data
Steps 1 and 3 can be solved by simple upsert operation on entire data. Step 2 however needs different approach, because that data is actually missing from API, so need to iterate over service’s DB checking it all with preloaded API data.
Let’s look at a simplified version of our reconciliation script in Go:
type Reconciler struct {
db DB
dbService DBService
api APIService
}
func (r *Reconciler) Start() {
// First, process all tenants from API, upsert them to DB
tenants := r.api.FetchTenants()
for _, apiTenant := range tenants {
r.dbService.UpsertTenant(r.db, apiTenant)
}
for _, tenant := range tenants {
// Fetch all data for the tenant, should load all users, groups and users_groups
data := r.api.FetchTenantData(tenant)
// Upsert all users
for _, user := range data.Users {
r.dbService.UpsertUser(r.db, user)
}
// Delete users from DB which do not exist in the API response
r.DeleteNonExistingUsers(r.db, data.Users)
for _, group := range data.Groups {
// Upsert all groups
r.dbService.UpsertGroup(r.db, group)
// Upsert all users_groups
for _, groupUser := range group.Users {
r.dbService.AddUserToGroup(r.db, groupUser)
}
// Delete users_groups entries from DB which do not exist in the API response
r.DeleteNonExistingUG(r.db, group.Users)
}
// Delete groups from DB which do not exist in the API response
r.DeleteNonExistingGroups(r.db, groups)
}
}
Let’s break down what are we doing here:
- We start by reconciling tenants. This is crucial because all other entities (users, groups) are associated with tenants and due to DB constrains, unless tenants are present, everything else would fail to persist.
- For each tenant, we fetch all related data (users, groups, and user-group associations) in one API call and upsert them.
- Upsert all users. Delete non-existing users, iterating over service’s DB and comparing with preloaded API data.
- Same for groups, but need to iterate over users-groups associations within a group.
- Finally, we delete any groups that no longer exist in the monolith.
Data Fetching with Pagination
Fetching data is normally should be done in paginated approach, if there are million users for a tenant, we don’t want to fetch them in a single call. Small example:
// Example how to fetch entities in pages
func (r *Reconciler) fetchPaginatedData(tenant Tenant) TenantData {
var allData TenantData
pageSize := 1000
// Fetch users
for page := 1; ; page++ {
users := r.api.FetchUsers(tenant, page, pageSize)
allData.Users = append(allData.Users, users...)
if len(users) < pageSize {
break
}
}
// do the same for other entities
}
This approach allows us to fetch data in manageable chunks, reducing the load on our API and keeping our application’s memory usage under control. However, it’s crucial to strike a balance between reducing API calls and managing memory usage. If your dataset is exceptionally large, you might need to process the data in batches rather than holding it all in memory. So if the service’s memory can’t hold all the data for a tenant, would need to process it part by part somehow.
Handling Kafka Messages During Reconciliation
When running the reconciliation process, it’s critical to ensure that we don’t lose any incoming data changes. To achieve this, we implemented the following strategy:
- Stop consuming messages from Kafka when reconciliation starts.
- Continue sending messages to Kafka as normal.
- Allow messages to stack up in Kafka until reconciliation is complete.
- Resume normal operation after reconciliation, processing the backlog of messages.
Consistency in Data Operations: The Upsert Paradigm
A critical aspect of our reconciliation process that deserves special attention is the handling of upserts. When updating or inserting data during reconciliation, it’s crucial to use the same methods and logic that are employed during normal operation, ideally the same methods in the code. This consistency ensures that all business rules, data validations, and side effects are properly maintained, regardless of whether the data is being processed as part of routine operations or during a reconciliation event.
Reconciliation in Multi-Cluster Environments
When operating in a multi-cluster environment, it’s crucial to ensure that only one instance performs the reconciliation to avoid conflicts and data inconsistencies.
The simplest solution is to reduce the number of instances to one during the reconciliation process. This can be achieved through your deployment or orchestration system. A more sophisticated approach is to implement a distributed locking mechanism. This allows one instance to acquire a lock and perform the reconciliation while other instances wait. And in both ways, nodes should stop processing incoming data before reconciliation is done.
Both approaches have their pros and cons. Reducing instance count is simpler but may impact your service’s availability. The distributed locking approach maintains your service’s capacity but adds complexity to your system, would probably need some external system like Zookeeper.
Lessons Learned
Implementing this reconciliation system taught us several valuable lessons:
- Always design for eventual consistency in distributed systems. Perfect real-time consistency is often impractical and can lead to performance issues.
- Use idempotent operations (like upserts) wherever possible. They make your system more resilient to retries and duplicates.
- Always have a way to verify and correct your data. Automated reconciliation processes like this one can be lifesavers.
- Use APIs for inter-service communication rather than direct database access. It provides better encapsulation and maintainability.
Conclusion
Data reconciliation in distributed systems presents numerous challenges, from efficient data fetching to maintaining data integrity during the process. By implementing pagination, carefully managing Kafka message flow, and addressing multi-cluster scenarios, we’ve built a robust reconciliation system that can handle large-scale data with minimal disruption to our services.
As distributed systems continue to grow in complexity, techniques like these will become increasingly important. By sharing our experiences and solutions, we can collectively build more resilient and scalable systems that can adapt to the ever-changing landscape of modern software architecture.