Skip to content

Commit

Permalink
feat: add pub/sub support
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson committed Jan 2, 2024
1 parent 48b9dac commit 4fcd4cb
Show file tree
Hide file tree
Showing 48 changed files with 3,811 additions and 75 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- "*"
- "!main"

env:
PSQLQUEUE_TEST_DATABASE_URL: "postgres://psqlqueue:[email protected]:5432/psqlqueue-test?sslmode=disable"
Expand Down
220 changes: 219 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Simple queue system powered by Golang and PostgreSQL.

## quickstart
## Quickstart

The idea of this service is to offer a simple queuing system using PostgreSQL as a backend.

Expand All @@ -21,6 +21,8 @@ docker run --name postgres-psqlqueue \
-d postgres:15-alpine
```

Before running anything, take a look at the possible environment variables that can be used with this service: https://github.com/allisson/psqlqueue/blob/main/env.sample

Now let's run the database migrations before starting the server:

```bash
Expand Down Expand Up @@ -180,3 +182,219 @@ curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/clea
```

This is the basics of using this service, I recommend that you check the swagger documentation at http://localhost:8000/v1/swagger/index.html to see more options.

## Pub/Sub mode

It's possible to use a Pub/Sub approach with the topics/subscriptions endpoints.

For this example let's imagine an event system that processes orders, we will have a topic called `orders` and will push some messages into this topic.

For creating a new topic we have these fields:
- "id": The identifier of this new topic.

```bash
curl --location 'http://localhost:8000/v1/topics' \
--header 'Content-Type: application/json' \
--data '{
"id": "orders"
}'
```

```json
{
"id": "orders",
"created_at": "2024-01-02T22:20:43.351647Z"
}
```

Now we will create two new queues:
- "all-orders": For receiving all messages from the orders topic.
- "processed-orders": For receiving only the messages with the `status` attribute equals to `"processed"`.

```bash
curl --location 'http://localhost:8000/v1/queues' \
--header 'Content-Type: application/json' \
--data '{
"id": "all-orders",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0
}'
```

```json
{
"id": "all-orders",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0,
"created_at": "2024-01-02T22:24:58.219593Z",
"updated_at": "2024-01-02T22:24:58.219593Z"
}
```

```bash
curl --location 'http://localhost:8000/v1/queues' \
--header 'Content-Type: application/json' \
--data '{
"id": "processed-orders",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0
}'
```

```json
{
"id": "processed-orders",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0,
"created_at": "2024-01-02T22:25:28.472891Z",
"updated_at": "2024-01-02T22:25:28.472891Z"
}
```

Now we will create two subscriptions to link the topic with the queue.

For creating a new subscription we have these fields:
- "id": The identifier of this new subscription.
- "topic_id": The id of the topic.
- "queue_id": The id of the queue.
- "message_filters": The filter for use with the message attributes.

Creating the first subscription:

```bash
curl --location 'http://localhost:8000/v1/subscriptions' \
--header 'Content-Type: application/json' \
--data '{
"id": "orders-to-all-orders",
"topic_id": "orders",
"queue_id": "all-orders"
}'
```

```json
{
"id": "orders-to-all-orders",
"topic_id": "orders",
"queue_id": "all-orders",
"message_filters": null,
"created_at": "2024-01-02T22:30:12.628323Z"
}
```

Now creating the second subscription, for this one we will use the message_filters field:

```bash
curl --location 'http://localhost:8000/v1/subscriptions' \
--header 'Content-Type: application/json' \
--data '{
"id": "orders-to-processed-orders",
"topic_id": "orders",
"queue_id": "processed-orders",
"message_filters": {"status": ["processed"]}
}'
```

```json
{
"id": "orders-to-processed-orders",
"topic_id": "orders",
"queue_id": "processed-orders",
"message_filters": {
"status": [
"processed"
]
},
"created_at": "2024-01-02T22:31:26.156692Z"
}
```

Now it's time to publish the first message:

```bash
curl --location 'http://localhost:8000/v1/topics/orders/messages' \
--header 'Content-Type: application/json' \
--data '{
"body": "body-of-the-order",
"attributes": {"status": "created"}
}'
```

And the second message:

```bash
curl --location 'http://localhost:8000/v1/topics/orders/messages' \
--header 'Content-Type: application/json' \
--data '{
"body": "body-of-the-order",
"attributes": {"status": "processed"}
}'
```

Now we will consume the `all-orders` queue:

```bash
curl --location 'http://localhost:8000/v1/queues/all-orders/messages'
```

```json
{
"data": [
{
"id": "01HK651Q52EZMPKBYZGVK0ZX8S",
"queue_id": "all-orders",
"label": null,
"body": "body-of-the-order",
"attributes": {
"status": "created"
},
"delivery_attempts": 1,
"created_at": "2024-01-02T19:35:00.635625-03:00"
},
{
"id": "01HK652W2HNW53XWV4QBT5MAJY",
"queue_id": "all-orders",
"label": null,
"body": "body-of-the-order",
"attributes": {
"status": "processed"
},
"delivery_attempts": 1,
"created_at": "2024-01-02T19:35:38.446759-03:00"
}
],
"limit": 10
}
```

As expected, this queue has the two published messages.

Now we will consume the `processed-orders` queue:

```bash
curl --location 'http://localhost:8000/v1/queues/processed-orders/messages'
```

```json
{
"data": [
{
"id": "01HK652W2JK8MPN3JDXY9RATS5",
"queue_id": "processed-orders",
"label": null,
"body": "body-of-the-order",
"attributes": {
"status": "processed"
},
"delivery_attempts": 1,
"created_at": "2024-01-02T19:35:38.446759-03:00"
}
],
"limit": 10
}
```

As expected, this queue has only one message that was published with the `status` attribute equal to `"processed"`.
8 changes: 7 additions & 1 deletion cmd/psqlqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,23 @@ func main() {
// repositories
queueRepository := repository.NewQueue(pool)
messageRepository := repository.NewMessage(pool)
topicRepository := repository.NewTopic(pool)
subscriptionRepository := repository.NewSubscription(pool)

// services
queueService := service.NewQueue(queueRepository)
messageService := service.NewMessage(messageRepository, queueRepository)
topicService := service.NewTopic(topicRepository, subscriptionRepository, queueRepository, messageRepository)
subscriptionService := service.NewSubscription(subscriptionRepository)

// http handlers
queueHandler := http.NewQueueHandler(queueService)
messageHandler := http.NewMessageHandler(messageService)
topicHandler := http.NewTopicHandler(topicService)
subscriptionHandler := http.NewSubscriptionHandler(subscriptionService)

// run http server
http.RunServer(c.Context, cfg, http.SetupRouter(logger, queueHandler, messageHandler))
http.RunServer(c.Context, cfg, http.SetupRouter(logger, queueHandler, messageHandler, topicHandler, subscriptionHandler))

return nil
},
Expand Down
2 changes: 2 additions & 0 deletions db/migrations/000002_create_pubsub_tables.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS subscriptions;
DROP TABLE IF EXISTS topics;
15 changes: 15 additions & 0 deletions db/migrations/000002_create_pubsub_tables.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS topics(
id VARCHAR PRIMARY KEY NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);

CREATE TABLE IF NOT EXISTS subscriptions(
id VARCHAR PRIMARY KEY NOT NULL,
topic_id VARCHAR NOT NULL,
queue_id VARCHAR NOT NULL,
message_filters JSONB,
created_at TIMESTAMPTZ NOT NULL,
FOREIGN KEY (topic_id) REFERENCES topics (id) ON DELETE CASCADE,
FOREIGN KEY (queue_id) REFERENCES queues (id) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS subscriptions_topic_id_queue_id_idx ON subscriptions (topic_id, queue_id);
Loading

0 comments on commit 4fcd4cb

Please sign in to comment.