Distributed Transaction In Processes
Let’s first talk about what is Database Transaction-
A transaction is a list of database operations that must be executed atomically. This atomicity makes data strongly consistent.
For instance, Bob wants to send Alice some money. So what are the set of operations that will happen to complete the transfer of money from Bob’s account to Alice’s account. First, there will be a read operation to know the current balance of Bob’s account. Let’s say Bob has $500 in his account and he wants to send Alice $200. So, the second operation would be deducting $200 from Bob’s account and assign the current balance as $300. And the final operation would be updating Alice’s account and add $200.
what if the system loses power between the three operations? When the system comes back online, it will be hard to find why the data is inconsistent. If the final operation fails and Alice’s account doesn’t update with added $200. There would be a inconsistent state in database. Bob will lose his money and Alice will not get her money. The money will just vanish in the thin air.
So, how do we solve this problem? There comes database transaction to rescue. Database transactions are atomic by nature. Either all operations succeed and their changes are committed, or that they fail without any side effects.
Now, we are aware about database transactions. Let’s talk about what is distributed transaction.
Distributed Transaction
In Microservice architecture, entire system/process is distributed into single independent sub-system/processes. Each processes has their own database. So when a transaction spans two or more processes to complete the transaction it is called a Distributed Transaction. Processes that executes a transaction in their database are called local transaction. To commit the distributed transaction, all the local transactions needs to be complete successfully.
There are two slightly different variants of distributed transactions.
- The first variant is one where the same piece of data needs to be updated in multiple replicas. This is the case where the whole database is essentially duplicated in multiple nodes, and a transaction needs to update all of them in an atomic way.
- The second variant is one where different pieces of data that reside in different processes need to be updated atomically. For instance, a financial application may use a partitioned database for the accounts of customers, where the balance of user A resides in process p1. In contrast, the balance of user B resides in process p2, and we want to transfer some money from user A to user B. We need to do this in an atomic way so that data is not lost (i.e., removed from user A, but not added in user B, because the transaction fails midway).
So, How do we manage atomicity and consistency in distributed transactions?
Here comes two phase commit protocol to solve this problem.
Two-phase commit
Two-phase commit (2PC) is a protocol used to implement atomic transaction commits across multiple processes. The protocol is split into two phases, prepare and commit. It assumes a process acts as coordinator and orchestrates the actions of the other processes, called participants. For example, the client application that initiates the transaction could act as the coordinator for the protocol.
When a coordinator wants to commit a transaction to the participants, it sends a prepare request asking the participants whether they are prepared to commit the transaction. If all participants reply that they are ready to commit, the coordinator sends out a commit message to all participants ordering them to do so. In contrast, if any process replies that it’s unable to commit, or doesn’t respond promptly, the coordinator sends an abort request to all participants.
Lets see how we can implement 2PC in Golang-
Let’s say we have a e-commerce site like Amazon. And to complete a order, we need to make two local transactions in payment service and shipment service.
-- Order service schema
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INT,
amount NUMERIC(10, 2),
status VARCHAR(20)
);
-- Payment service schema
CREATE TABLE payments (
id SERIAL PRIMARY KEY,
order_id INT,
amount NUMERIC(10, 2),
status VARCHAR(20)
);
-- Shipment service schema
CREATE TABLE shipments (
id SERIAL PRIMARY KEY,
order_id INT,
status VARCHAR(20)
);
Now, let’s implement the two-phase commit protocol.
First, we need to define a function that will perform the first phase of the commit, where we ask each service to prepare the transaction. If all services return “prepared”, we move on to the second phase. Otherwise, we abort the transaction.
func prepareTransaction(orderID int, paymentID int, shipmentID int, db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return err
}
// Tell each service to prepare the transaction
_, err = tx.Exec("SELECT * FROM prepare_order($1)", orderID)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("SELECT * FROM prepare_payment($1)", paymentID)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("SELECT * FROM prepare_shipment($1)", shipmentID)
if err != nil {
tx.Rollback()
return err
}
// If all services are prepared, commit the transaction
err = tx.Commit()
if err != nil {
return err
}
return nil
}
Next, we need to define a function that will perform the second phase of the commit, where we ask each service to commit the transaction. If all services return “committed”, we commit the transaction. Otherwise, we abort the transaction.
func commitTransaction(orderID int, paymentID int, shipmentID int, db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return err
}
// Tell each service to commit the transaction
_, err = tx.Exec("SELECT * FROM commit_order($1)", orderID)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("SELECT * FROM commit_payment($1)", paymentID)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("SELECT * FROM commit_shipment($1)", shipmentID)
if err != nil {
tx.Rollback()
return err
}
// If all services are committed, commit the transaction
err = tx.Commit()
if err != nil {
return err
}
return nil
}
But it has some cons as well. 2PC is a synchronous blocking protocol; if any of the participants isn’t available, the transaction can’t make any progress, and the application blocks completely. The assumption is that the coordi- nator and the participants are available and that the duration of the transaction is short-lived. Because of its blocking nature, 2PC is generally combined with a blocking concurrency control mechanism, like 2PL, to provide isolation.
But, some types of transactions can take hours to execute, in which case blocking just isn’t an option. And some transactions don’t need isolation in the first place. Suppose we were to drop the isolation requirement and the assumption that the transactions are short-lived. Can we come up with an asynchronous non-blocking solution that still provides atomicity?
SAGA Pattern
A saga is a distributed transaction composed of a set of local transactions 𝑇1, 𝑇2, …, 𝑇𝑛, where 𝑇𝑖 has a corresponding compensating local transaction 𝐶𝑖 used to undo its changes. The Saga guarantees that either all local transactions succeed, or in case of failure, that the compensating local transactions undo the partial execution of the transaction altogether. This guarantees the atomicity of the protocol; either all local transactions succeed, or none of them do.
A Saga can be implemented with an orchestrator, the transaction’s coordinator, that manages the execution of the local transactions across the processes involved, the transaction’s participants.
Let’s take the same example of e-commerce like Amazon. To complete an order we need to make payment and then we need to make shipment to the customer.
At a high level, the Saga can be implemented with the workflow depicted in the diagram:
- The coordinator initiates the transaction by sending a making request to the payment service (𝑇1). If the booking fails, no harm is done, and the coordinator marks the transaction as aborted.
- If the payment making succeeds, the coordinator sends a shipping request to the shipment service (𝑇2). If the request succeeds, the transaction is marked as successful, and we are all done.
- If the shipping request fails, the transaction needs to be aborted. The coordinator sends a cancellation request to the payment service to refund the payment it previously made(𝐶1). Without the cancellation, the transaction would be left in an inconsistent state, which would break its atomicity guarantee.
The coordinator can communicate asynchronously with the participants via message channels to tolerate temporarily unavailable ones. As the transaction requires multiple steps to succeed, and the coordinator can fail at any time, it needs to persist the state of the transaction as it advances.
Instead of calling another service in the middle of the transaction, let the service do its job within its scope and publishes the status through a message broker. That’s all. No long, synchronous, blocking call somewhere in the middle of the transaction. We will use RabbitMQ as the broker in this example for a reason that will become apparent later.
Lets see an example in Golang-
First, let’s define the necessary data models:
// Order represents an order in the e-commerce application
type Order struct {
ID string
CustomerID string
ProductID string
Quantity int
Status string
}
// Payment represents a payment for an order
type Payment struct {
ID string
OrderID string
Amount float64
Status string
}
// Shipping represents a shipment of an order
type Shipping struct {
ID string
OrderID string
Address string
Status string
}
Next, we’ll create the order service that publishes an order-created event to RabbitMQ when a new order is created:
// OrderService represents the service responsible for creating orders
type OrderService struct {
db *sql.DB
channel *amqp.Channel
}
// CreateOrder creates a new order and publishes an order-created event to RabbitMQ
func (s *OrderService) CreateOrder(order Order) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Insert the order into the database
_, err = tx.Exec("INSERT INTO orders (id, customer_id, product_id, quantity, status) VALUES ($1, $2, $3, $4, $5)", order.ID, order.CustomerID, order.ProductID, order.Quantity, order.Status)
if err != nil {
return err
}
// Publish an order-created event to RabbitMQ
orderCreated := map[string]interface{}{
"id": order.ID,
"customer_id": order.CustomerID,
"product_id": order.ProductID,
"quantity": order.Quantity,
"status": order.Status,
}
err = s.channel.Publish("orders", "order.created", false, false, amqp.Publishing{
ContentType: "application/json",
Body: json.Marshal(orderCreated),
})
if err != nil {
return err
}
// Commit the transaction
err = tx.Commit()
if err != nil {
return err
}
return nil
}
The payment and shipping services will subscribe to the order-created event and perform their respective actions when it is received.
// PaymentService represents the service responsible for processing payments
type PaymentService struct {
db *sql.DB
channel *amqp.Channel
}
// StartPaymentProcessing starts listening for order-created events and processes payments when they are received
func (s *PaymentService) StartPaymentProcessing() error {
// Create a queue and bind it to the "order.created" exchange
queue, err := s.channel.QueueDeclare("payments", false, false, false, false, nil)
if err != nil {
return err
}
err = s.channel.QueueBind(queue.Name, "order.created", "orders", false, nil)
if err != nil {
return err
}
// Start consuming messages from the queue
msgs, err := s.channel.Consume(queue.Name, "", true, false, false, false, nil)
if err != nil {
return err
}
// Process messages
for msg := range msgs {
orderCreated := make(map[string]interface{})
err := json.Unmarshal(msg.Body, &orderCreated)
if err != nil {
return err
}
// Process the payment
}
}
// PaymentService represents the service responsible for processing payments
type ShipmentService struct {
db *sql.DB
channel *amqp.Channel
}
// StartPaymentProcessing starts listening for order-created events and processes payments when they are received
func (s * ShipmentService) StartShipmentProcessing() error {
// Create a queue and bind it to the "order.created" exchange
queue, err := s.channel.QueueDeclare("shipping", false, false, false, false, nil)
if err != nil {
return err
}
err = s.channel.QueueBind(queue.Name, "shipping.created", "shipping", false, nil)
if err != nil {
return err
}
// Start consuming messages from the queue
msgs, err := s.channel.Consume(queue.Name, "", true, false, false, false, nil)
if err != nil {
return err
}
// Process messages
for msg := range msgs {
orderCreated := make(map[string]interface{})
err := json.Unmarshal(msg.Body, &orderCreated)
if err != nil {
return err
}
// Process the shipment
}
}
That’s how Saga pattern brings the asynchronous behaviour in the distributed translation.
Hope you liked reading this article and learned something. Happy Reading :)