I think I hit a bug with the coded workflows and queue methods.
Summary:
The client.register_queue method creates a queue of type "queue" by default. However, the push_workitem and pop_workitem api methods seem to require the queue to be of type "workitemqueue", since if I manually change the queue type in db, I can then successfully use pop and push methods in code.
Steps to Reproduce:
Use register_queue to create a queue (e.g., "queue-node").
Attempt to push a work item using push_workitem to "queue-node".
Observe the error Push workitem failed: ServerError("\"Work item queue not found queue-node (undefined) not found.\"")
Change manually the db queue type to workitemqueue, then retry push or pop and see it works.
Conclusion
Not sure if I use it wrong, but for now it doesn’t work out of the box with the default provided example of queue listener or workitem processer.
Message queues: This gives you access to a traditional message queue system, where you can register either a queue or an exchange. In OpenCore, by default, the queue is stateless, so there is always a small risk of losing a message.
Workitem queues: This gives you access to a more pub/sub subscriber-like system. Here we ensure transactional safety around the work items, we have more control around retries, we can set any type of expiration (timeout), and we can control the flow of messages through multiple work item queues.
Now, to your problem, and the confusing part. Since we want to avoid clients constantly asking OpenCore “do you have something for me?” wasting network and processing resources, we flip it around and allow clients to consume a workitem queue as if it were a message queue, and we then send an empty message to the client when there is work to be processed.
For this reason, you cannot create a message queue and workitem queue with the same name, since a workitem queue is also a message queue (with a specific purpose).
I was about to write that it somehow works, although it is very not-straightforward.
For example, when trying to force the work queue item to notify an agent with a daemon or non-daemon package (the default work item listener), it throws an error packageId is Expected string, received object
But if I leave the package empty, it still somehow runs, because I guess the package is set as daemon. And I managed to process a batch of workitems, but if I requeue them, they don’t process anymore, even thought the daemon process still runs… so I got to figure this out a little more.
Setting an agent and package was a “poor man’s” solution/workaround for calling a script per workitem without making the code in the script aware of Opencore and workitem queues. In this mode, we set a few environment variables that the script requires to know where to find the payload and files. And the agent handles updating the workitem’s based on the exitcode of the process.
For “real” code/scripts where you have installed the OpenAPI client libraries for the language you’re working with, you call register_queue using the name of the workitem queue, and when you receive a message, you start popping workitems until you no longer receive an item.
And yes, in this mode, your code needs to run as a daemon, so it can wait for a message to be sent on the message queue.
ps: this will change once we launch serverless functions. Then it will be possible to spawn a function with your code, whenever a message queue message is sent. This way you can avoid to have a agent running all the time that is running your code.
If you want to work with workitems you must create the workitem queue first.
If you are using any of the example repositories on how to work with workitem queues, you will find they all use register_queue to consume the message queue associated with the workitem queue, this is to ensure best performance when you have a large amount of agents consuming workitem queues.
yes, register_queue will create a queue, if no queue already exists, hence register part in the name. you could argue comsume_queue might be better, but i think it’s a bit late to change it now.