Custom Workers
Custom workers let you process background messages from Pub/Sub topics. Use them for asynchronous processing, event-driven integrations, or offloading long-running tasks from request handlers.
Registering a worker handler
Use sdk.registerWorkerHandler() inside your module's register function.
Basic example
import { ConvoticModule } from "@convotic/block-sdk";
const module: ConvoticModule = {
name: "order-processor",
version: "1.0.0",
register(sdk) {
sdk.registerWorkerHandler({
topic: "custom.orders.new",
handler: async (message, ctx) => {
const order = message.data;
ctx.logger.info("Processing order", { orderId: order.id });
// Find or create the contact
let contact = await ctx.contacts.findByExternalId(order.customer_id);
if (!contact) {
contact = await ctx.contacts.create({
externalId: order.customer_id,
firstName: order.customer_name,
email: order.customer_email,
});
}
// Tag the contact
await ctx.contacts.addTag(contact.id, "has-order");
// Send a confirmation message if the contact has a conversation
const conversation = await ctx.conversations.findByContactId(contact.id);
if (conversation) {
await ctx.messages.send(conversation.id, {
text: `Thank you for your order #${order.id}! We'll keep you updated.`,
});
}
},
});
},
};
export default module;
Worker handler signature
sdk.registerWorkerHandler({
topic: string,
options?: WorkerOptions,
handler: (message: WorkerMessage, ctx: ConvoticContext) => Promise<void>,
});
WorkerMessage
| Property | Type | Description |
|---|---|---|
id | string | Unique message ID. |
data | any | The message payload (parsed JSON). |
attributes | Record<string, string> | Message attributes / metadata. |
publishedAt | Date | When the message was published. |
WorkerOptions
| Option | Type | Default | Description |
|---|---|---|---|
maxRetries | number | 5 | Maximum retry attempts before dead-lettering. |
ackDeadline | number | 30 | Seconds before the message is redelivered if not acknowledged. |
concurrency | number | 1 | Number of messages processed concurrently. |
ConvoticContext
The same context object available in Custom Routes, providing access to contacts, conversations, messages, workflows, storage, and logger.
Publishing messages to a topic
You can publish messages to your custom topics from routes or other workers:
sdk.registerRoute({
method: "POST",
path: "/orders",
handler: async (req, ctx) => {
// Publish to the custom topic for async processing
await ctx.pubsub.publish("custom.orders.new", {
data: req.body,
attributes: {
source: "api",
},
});
return { status: 202, body: { message: "Order queued for processing" } };
},
});
Error handling and retries
If your handler throws an error, the message is not acknowledged and will be redelivered after the ackDeadline period. After maxRetries attempts, the message is sent to a dead-letter topic.
sdk.registerWorkerHandler({
topic: "custom.orders.new",
options: {
maxRetries: 3,
ackDeadline: 60,
},
handler: async (message, ctx) => {
try {
await processOrder(message.data);
} catch (error) {
ctx.logger.error("Failed to process order", {
orderId: message.data.id,
error: error.message,
});
throw error; // Re-throw to trigger retry
}
},
});
For idempotent processing, use message.id or a business key (e.g., order ID) to detect duplicates. Pub/Sub guarantees at-least-once delivery, so your handler may receive the same message more than once.