メインコンテンツまでスキップ

Custom Workers

Custom workers let you process background messages from Pub/Sub topics. Use them for asynchronous processing, event-driven integrations, or offloading long-running tasks from request handlers.

Registering a worker handler

Use sdk.registerWorkerHandler() inside your module's register function.

Basic example

import { ConvoticModule } from "@convotic/block-sdk";

const module: ConvoticModule = {
name: "order-processor",
version: "1.0.0",

register(sdk) {
sdk.registerWorkerHandler({
topic: "custom.orders.new",
handler: async (message, ctx) => {
const order = message.data;

ctx.logger.info("Processing order", { orderId: order.id });

// Find or create the contact
let contact = await ctx.contacts.findByExternalId(order.customer_id);
if (!contact) {
contact = await ctx.contacts.create({
externalId: order.customer_id,
firstName: order.customer_name,
email: order.customer_email,
});
}

// Tag the contact
await ctx.contacts.addTag(contact.id, "has-order");

// Send a confirmation message if the contact has a conversation
const conversation = await ctx.conversations.findByContactId(contact.id);
if (conversation) {
await ctx.messages.send(conversation.id, {
text: `Thank you for your order #${order.id}! We'll keep you updated.`,
});
}
},
});
},
};

export default module;

Worker handler signature

sdk.registerWorkerHandler({
topic: string,
options?: WorkerOptions,
handler: (message: WorkerMessage, ctx: ConvoticContext) => Promise<void>,
});

WorkerMessage

PropertyTypeDescription
idstringUnique message ID.
dataanyThe message payload (parsed JSON).
attributesRecord<string, string>Message attributes / metadata.
publishedAtDateWhen the message was published.

WorkerOptions

OptionTypeDefaultDescription
maxRetriesnumber5Maximum retry attempts before dead-lettering.
ackDeadlinenumber30Seconds before the message is redelivered if not acknowledged.
concurrencynumber1Number of messages processed concurrently.

ConvoticContext

The same context object available in Custom Routes, providing access to contacts, conversations, messages, workflows, storage, and logger.

Publishing messages to a topic

You can publish messages to your custom topics from routes or other workers:

sdk.registerRoute({
method: "POST",
path: "/orders",
handler: async (req, ctx) => {
// Publish to the custom topic for async processing
await ctx.pubsub.publish("custom.orders.new", {
data: req.body,
attributes: {
source: "api",
},
});

return { status: 202, body: { message: "Order queued for processing" } };
},
});

Error handling and retries

If your handler throws an error, the message is not acknowledged and will be redelivered after the ackDeadline period. After maxRetries attempts, the message is sent to a dead-letter topic.

sdk.registerWorkerHandler({
topic: "custom.orders.new",
options: {
maxRetries: 3,
ackDeadline: 60,
},
handler: async (message, ctx) => {
try {
await processOrder(message.data);
} catch (error) {
ctx.logger.error("Failed to process order", {
orderId: message.data.id,
error: error.message,
});
throw error; // Re-throw to trigger retry
}
},
});
ヒント

For idempotent processing, use message.id or a business key (e.g., order ID) to detect duplicates. Pub/Sub guarantees at-least-once delivery, so your handler may receive the same message more than once.