Replicating from Aurora for PostgreSQL via Debezium
Debezium is a popular platform for Change Data Capture (CDC).
This guide shows you how you can replicate tables from your transactional Amazon Aurora PostgreSQL to CedarDB, allowing you to do fast analytics on data is it comes in without impacting your existing data infrastructure.
Setting up Replication
Starting an EC2 Instance
CedarDB and Debezium will live inside this instance.
m6id.2xlarge
instance type which comes with 32 GiB of Memory, 8 vCPUs and 500 GiB of fast ephemeral SSD. If you want to spend less, going for something with 4 vCPUs is also fine.The rest of this instruction manual assumes you use Ubuntu 24.04 as your operating system. Since CedarDB runs inside its own docker image, you can choose any other OS as well but you might have to adapt the installation instructions accordingly.
Setting up your EC2 Instance
CedarDB loves fast SSDs. If your instance comes with an ephemeral SSD, mount it like this:
sudo mkfs.ext4 -E nodiscard /dev/nvme1n1
mkdir /home/ubuntu/db
sudo mount -o discard /dev/nvme1n1 /home/ubuntu/db
sudo chown ubuntu:ubuntu db
Next, we install docker:
sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
Before you can docker commands, you need to add your user to the docker group and re-login:
sudo adduser ubuntu docker
Finally, build the docker image using the CedarDB Dockerfile.
docker build --tag cedardb .
Starting an Amazon Aurora PostgreSQL Cluster
If you already have a cluster, you can skip this step.
db.r6gd.xlarge
instance type which comes with 32 GiB of memory and 4 vCPUs. You can definitely go cheaper here, if you just want to play around a little bit.Make sure to connect it to your EC2 instance.
In this example, we will assume you have created an admin user postgresuser
with password postgrespw
.
Configuring your Amazon Aurora PostgreSQL Cluster
Amazon Aurora PostgreSQL needs to be configured for logical replication to Debezium. You can take a look at the Debezium documentation for details and instructions to check if your cluster is already set up correctly.
If not, here are the steps to enable it:
Create a new parameter group for your cluster
Call it, e.g.,
logicalreplication
, set the engine type toAurora PostgreSQL
, the family to your PostgreSQL version, e.g.aurora-postgresql15
and the type toDB Cluster Parameter Group
.Then, within that parameter group, change the parameter
ds.logical_replication
to1
.Apply this group to your cluster
Restart your cluster (or wait for the next maintenance window)
Starting CedarDB and Debezium
Create a file docker-compose.yml
with the following content:
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.7
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
cedardb:
image: cedardb:latest
ports:
- 5433:5432
volumes:
- type: bind
source: /home/ubuntu/db
target: /var/lib/cedardb/data
environment:
- CEDAR_USER=postgres
- CEDAR_PASSWORD=postgres
- CEDAR_DB=postgres
connect:
image: quay.io/debezium/connect:2.7
ports:
- 8083:8083
- 1976:1976
links:
- kafka
- cedardb
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Make sure that your docker container database is created somewhere on a fast SSD, i.e. modify the docker volumes of CedarDB in the docker-compose configuration accordingly.
If you followed this guide, /home/ubuntu/db
should point to your fast ephemeral ssd (if your instance has one).
Then, start all services with the following command:
docker compose up
Install psql to talk to CedarDB and Postgres:
sudo apt install posgresql-common postgresql-client-16
Creating a Source and Sink Configuration for Debezium
We now need to connect Debezium to Amazon Aurora PostgreSQL via a source
connector and to CedarDB via a sink
connector.
Create a file source.json
with the following contents:
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "[Your Amazon Aurora PostgreSQL hostname]",
"plugin.name": "pgoutput",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname" : "postgres",
"topic.prefix": "postgres",
"heartbeat.inverval.ms" : "20",
"table.include.list": "public.lineitem",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "-1",
"topic.creation.default.partitions": "-1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"delete.handling.mode": "none"
}
}
Create a file sink.json
with the following contents:
{
"name": "cedar-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics.regex": "postgres.public.lineitem",
"connection.url": "jdbc:postgresql://cedardb:5432/postgres?stringtype=unspecified",
"connection.username": "postgres",
"connection.password": "postgres",
"delete.handling.mode": "none",
"insert.mode": "upsert",
"schema.evolution": "basic",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"primary.key.fields": "lineitem_id",
"table.name.format": "${source.table}"
}
}
This configuration assumes we want to replicate a table called lineitem
with a primary key called lineitem_id
. Modify both files to work with your Amazon Aurora PostgreSQL and CedarDB credentials.
Starting Source and Sink
Execute the following commands to register the source and sink with Debezium and start them:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
It’s possible that you need to restart all containers (docker compose down
then docker compose up
) for the replication to start after creating the connectors.
If you want to delete source and sink, you can use the following commands:
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/postgres-source
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/cedar-sink
Testing the Replication
Congratulations, you’re now ready to go! Let’s test if everything is working correctly.
Creating a Table in Amazon Aurora for PostgreSQL
Connect to your cluster (e.g., via psql -h [...].rds.amazonaws.com -U postgresuser -p 5432 -d postgres
) and paste the following:
CREATE TABLE lineitem (
lineitem_id BIGINT PRIMARY KEY, -- Unique identifier for each line item
transaction_id BIGINT NOT NULL, -- Links the line item to a specific transaction (e.g., order_id or invoice_id)
product_id BIGINT NOT NULL, -- ID of the product or service being transacted
quantity NUMERIC(10, 2) NOT NULL, -- Quantity of the product being ordered
unit_price NUMERIC(15, 4) NOT NULL, -- Price per unit of the product
discount NUMERIC(5, 2) DEFAULT 0.00, -- Discount applied to the line item, if any
tax_rate NUMERIC(5, 2) DEFAULT 0.00, -- Applicable tax rate
total_amount NUMERIC(20, 4) GENERATED ALWAYS AS (quantity * unit_price * (1 - discount / 100) * (1 + tax_rate / 100)) STORED, -- Calculated total amount for the line item
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, -- Timestamp when the line item was created
transaction_date DATE NOT NULL, -- Date of the transaction
status VARCHAR(50) DEFAULT 'active', -- Status of the line item (e.g., 'active', 'void', 'pending')
notes TEXT -- Optional notes or comments about the line item
);
INSERT INTO lineitem (lineitem_id, transaction_id, product_id, quantity, unit_price, discount, tax_rate, transaction_date, status, notes
) VALUES
(1, 1001, 2001, 10.00, 50.00, 5.00, 10.00, '2024-10-15', 'active', 'First line item for this transaction'),
(2, 1001, 2002, 5.00, 100.00, 0.00, 15.00, '2024-10-15', 'active', 'Second line item with no discount'),
(3, 1002, 2003, 2.00, 250.00, 10.00, 12.00, '2024-10-16', 'pending', 'Third line item in a different transaction'),
(4, 1003, 2004, 7.00, 75.00, 3.00, 8.00, '2024-10-16', 'void', 'Voided line item for test purposes');
Checking Replication in CedarDB
Now connect to CedarDB (e.g., via PGPASSWORD=postgres psql -h localhost -U postgres
) and check the replicated table:
select * from lineitem;
Automate
Let’s create some more rows! Create a file inserter.py
with the following content:
import psycopg2
from psycopg2 import sql
import random
import time
# PostgreSQL connection parameters
conn = psycopg2.connect(
dbname="postgres",
user="postgresuser",
password="postgrespw",
host="[Your Amazon Aurora PostgreSQL hostname]",
port="5432"
)
# Create a cursor object
cur = conn.cursor()
# Function to insert dummy lineitem records
def insert_dummy_lineitem(lineitem_id):
transaction_id = random.randint(1000, 1100) # Random transaction_id between 1000 and 1100
product_id = random.randint(2000, 2100) # Random product_id between 2000 and 2100
quantity = round(random.uniform(1, 20), 2) # Random quantity between 1 and 20
unit_price = round(random.uniform(10, 200), 2) # Random unit price between 10 and 200
discount = round(random.uniform(0, 10), 2) # Random discount between 0% and 10%
tax_rate = round(random.uniform(5, 15), 2) # Random tax rate between 5% and 15%
transaction_date = time.strftime('%Y-%m-%d') # Current date
status = random.choice(['active', 'pending', 'void']) # Random status
notes = f"Random note {random.randint(1, 100)}" # Random notes
# Insert statement
insert_query = sql.SQL("""
INSERT INTO lineitem (
lineitem_id, transaction_id, product_id, quantity, unit_price, discount, tax_rate, transaction_date, status, notes
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""")
# Execute insert statement
cur.execute(insert_query, (
lineitem_id, transaction_id, product_id, quantity, unit_price, discount, tax_rate, transaction_date, status, notes
))
# Commit transaction
conn.commit()
# Insert dummy data every few milliseconds
try:
lineitem_id = 5
while True:
insert_dummy_lineitem(lineitem_id)
print("Inserted a new lineitem.")
lineitem_id = lineitem_id + 1
except KeyboardInterrupt:
print("Insertion process stopped.")
finally:
# Close the cursor and connection
cur.close()
conn.close()
It requires psycopg2 which you can install via sudo apt install python3-psycopg2
.
Then run python3 inserter.py
.
Running Analytical Queries
Connect to CedarDB again and run your analytical queries. Let’s find the average tax rate for each product:
SELECT
product_id,
AVG(tax_rate) AS avg_tax_rate
FROM
lineitem
GROUP BY
product_id
ORDER BY
avg_tax_rate DESC;
You can now run all your expensive analytical queries against CedarDB while keeping your PostgreSQL database system as system of record.