Skip to content

Better Pub/Sub/Events mechanism inside kernelci-api #624

@nuclearcat

Description

@nuclearcat

Right now we are relying on Redis pub/sub which are proven to be broken.
I propose to migrate to replace redis pub/sub by local SQLite database in WAL mode. This will make persistent events database (even if API restarted, now restart of API or Redis make us lose all events), and additionally we can implement guaranteed events processing mechanism.

Database schema proposal:

for Messages

  • messages(id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT, created_at DATETIME, expires_at DATETIME NULL, payload TEXT, owner TEXT, acked_at DATETIME NULL, acked_by TEXT NULL)

For Subscriptions

  • subscriptions(id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT, user TEXT, promiscuous BOOLEAN, created_at DATETIME, last_poll DATETIME, last_message_id INTEGER)

Core flows:

  • Publish
  • Subscribe
  • Listen
  • Ack (optional)
  • Cleanup (background task)

First stage - full compatibility. Goal: removing Redis from equation, improving consistency.

  • Publish
    • INSERT into messages(topic, expires_at(24h), owner, payload).
  • Subscribe
    • INSERT into subscriptions(topic, user, promiscuous, created_at, last_poll, last_message_id). last_message_id is current last message on topic in messages.
  • Listen
    • on promisc - SELECT FROM messages WHERE topic=? AND id > subscriptions.last_message_id LIMIT 1
    • on non-promisc - SELECT FROM messages WHERE topic=? AND id > subscriptions.last_message_id AND owner=subscriptions.user LIMIT 1
      After sending to client we update subscriptions.last_message_id.

Second stage introduction guaranteed message processing by ACK

  • Add unack_only flag to subscriptions?
  • Modify Listen to respect unack_only, so we receive only unacknowledged messages.
  • Add ACK command so we can acknowledge messages (set acked_at=now() and acked_by= user).
    TODO: problem, we might have issue of message received and ACKed by multiple users.
    Maybe UPDATE messages SET acked_at=now(), acked_by=user WHERE id=? AND acked_at IS NULL

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingenhancementNew feature or requesttechdebtSomething that works for now, but should be done better

Type

No type

Projects

Status

No status

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions