Project

Profile

Help

Issue #2995

Workers work exclusively on a single qpid queue before moving onto the next queue

Added by daviddavis@redhat.com 3 months ago. Updated about 2 months ago.

Status:
CLOSED - NOTABUG
Priority:
High
Sprint/Milestone:
Severity:
3. High
Platform Release:
Blocks Release:
OS:
Backwards Incompatible:
No
Triaged:
Yes
Groomed:
No
Sprint Candidate:
No
Tags:
QA Contact:
Complexity:
Smash Test:
Verified:
No
Verification Required:
No

Description

Using Celery 3, I have the following code in a blah.py file:

from celery import Celery
from celery import task
import time

app = Celery('blah', broker='qpid://localhost//') 

@task
def my_task(item_id):
    time.sleep(2)
    print('Processing item "%s"...' % item_id)

def add_items_to_queue(queue_name, items_count):
    for i in range(0, items_count):
        my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)

Boot up celery:

celery worker -A blah -c 1 -Q queue1,queue2

Then boot up python and run:

>>> import blah
>>> blah.add_items_to_queue('queue1', 10); blah.add_items_to_queue('queue2', 2)

First, the results on rabbitmq, which are somewhat expected in that it's not just selecting from one queue and then the next. Not sure what strategy is being used here—doesn't seem like round robin—but at least it's not focusing exclusively on queue1.

[2017-08-30 12:38:33,514: WARNING/Worker-1] Processing item "queue1-0"...
[2017-08-30 12:38:35,518: WARNING/Worker-1] Processing item "queue1-1"...
[2017-08-30 12:38:37,522: WARNING/Worker-1] Processing item "queue1-2"...
[2017-08-30 12:38:39,526: WARNING/Worker-1] Processing item "queue1-3"...
[2017-08-30 12:38:41,529: WARNING/Worker-1] Processing item "queue1-4"...
[2017-08-30 12:38:43,533: WARNING/Worker-1] Processing item "queue1-5"...
[2017-08-30 12:38:45,537: WARNING/Worker-1] Processing item "queue2-0"...
[2017-08-30 12:38:47,541: WARNING/Worker-1] Processing item "queue1-6"...
[2017-08-30 12:38:49,544: WARNING/Worker-1] Processing item "queue1-7"...
[2017-08-30 12:38:51,548: WARNING/Worker-1] Processing item "queue1-8"...
[2017-08-30 12:38:53,552: WARNING/Worker-1] Processing item "queue1-9"...
[2017-08-30 12:38:55,556: WARNING/Worker-1] Processing item "queue2-1"...

The results when I use qpid:

[2017-08-30 12:19:28,857: WARNING/Worker-1] Processing item "queue1-0"...
[2017-08-30 12:19:30,860: WARNING/Worker-1] Processing item "queue1-1"...
[2017-08-30 12:19:32,864: WARNING/Worker-1] Processing item "queue1-2"...
[2017-08-30 12:19:34,868: WARNING/Worker-1] Processing item "queue1-3"...
[2017-08-30 12:19:36,872: WARNING/Worker-1] Processing item "queue1-4"...
[2017-08-30 12:19:38,875: WARNING/Worker-1] Processing item "queue1-5"...
[2017-08-30 12:19:40,879: WARNING/Worker-1] Processing item "queue1-6"...
[2017-08-30 12:19:42,883: WARNING/Worker-1] Processing item "queue1-7"...
[2017-08-30 12:19:44,886: WARNING/Worker-1] Processing item "queue1-8"...
[2017-08-30 12:19:46,890: WARNING/Worker-1] Processing item "queue1-9"...
[2017-08-30 12:19:48,894: WARNING/Worker-1] Processing item "queue2-0"...
[2017-08-30 12:19:50,898: WARNING/Worker-1] Processing item "queue2-1"...

This creates a problem where for example workers will work exclusively on the celery queue before pulling tasks from their own reserved worker queues.

History

#2 Updated by daviddavis@redhat.com 3 months ago

  • Description updated (diff)

#3 Updated by daviddavis@redhat.com 3 months ago

After some more investigation, it looks like messages are being handled in the order they are received. To test this:

1. Add some messages to queue1
2. Add some messages to queue2
3. Add some messages to queue1

The messages in queue2 will be executed before the last set of messages in queue1. It isn't preferring one queue over another. It's just FIFO.

This seems to echo what I see in the code [0]. We're just pulling and then handling messages from qpid without any code being there to round robin (RR) pull from the queues. @dkliban and I confirmed with the qpid engineers that messages will not be handled RR style when we use next_receiver. They seemed to indicate that there's no guaranteed order.

So it sounds like we need to handle RR ourselves. One option they mentioned was to set the receiver capacity to 1 but it looks like we're already doing that. The next option is to not use next_receiver but to iterate through the receivers ourselves.

[0] https://github.com/celery/kombu/blob/2a161a4408a4322351cb882f6cdd98019ce22d5e/kombu/transport/qpid.py#L1695-L1697

#4 Updated by daviddavis@redhat.com 3 months ago

Doing some more digging/debugging, I was trying to look at iterating through receivers in 'drain_events'. I don't think that the 'drain_events' method is the right place to introduce the needed RR functionality.

Here's the patch I was using to iterate through receivers though:

https://gist.github.com/daviddavis/f5bb18b3a9f14c48a57257cb05df6c55

#5 Updated by daviddavis@redhat.com 3 months ago

Here's a patch I've been using to test out things in pulp:

https://gist.github.com/daviddavis/f1426796a19c73fb081840dee0fd4854

You can adjust the 200 to whatever number you want. To actually test, trigger a sync:

pulp-admin rpm repo sync run --repo-id zoo

Then watch the qpid queues:

watch qpid-stat -q

#6 Updated by jortel@redhat.com 3 months ago

Looks like Celery does not support round-robin consumption of task on multiple queues as a 1st class concept. Instead it relies on the kombu transport implementation. For amqp (rabbit), the behavior mostly resembles FIFO but with some randomness. For qpid, it's basically FIFO. I don't believe that modifying the kombu transports to achieve round-robin would be appropriate as kombu is a general abstraction for messaging. Instead, round-robin support could be added to Celery.

That said, I did work up a patch to implement round-robin in the QPID transport. Mainly to illustrate that the consumption order is dictated by the transport. And, to have in our back pocket.

I don't recommend this but ...

[jortel@localhost transport]$ diff -Naur qpid.py.saved qpid.py
--- qpid.py.saved    2017-08-31 11:28:05.967912179 -0500
+++ qpid.py    2017-08-31 11:27:41.026387415 -0500
@@ -1640,18 +1640,19 @@
             all messages are drained. Defaults to 0.
         :type timeout: int
         """ 
-        start_time = time.time()
+        ready = set()
         elapsed_time = -1
+        start_time = time.time()
         while elapsed_time < timeout:
             try:
                 receiver = self.session.next_receiver(timeout=timeout)
-                message = receiver.fetch()
-                queue = receiver.source
+                ready.add(receiver)
             except QpidEmpty:
                 raise socket.timeout()
-            else:
-                connection._callbacks[queue](message)
             elapsed_time = time.time() - start_time
+        for receiver in sorted(ready, key=lambda r: r.source):
+            message = receiver.fetch()
+            connection._callbacks[receiver.source](message)
         raise socket.timeout()

     def create_channel(self, connection):
[jortel@localhost transport]$ 

Using the test harness described above:

[2017-08-31 11:38:39,937: WARNING/MainProcess] celery@localhost.localdomain ready.
[2017-08-31 11:38:41,954: WARNING/Worker-1] Processing item "queue1-0"...
[2017-08-31 11:38:43,957: WARNING/Worker-1] Processing item "queue2-0"...
[2017-08-31 11:38:45,966: WARNING/Worker-1] Processing item "queue1-1"...
[2017-08-31 11:38:47,972: WARNING/Worker-1] Processing item "queue2-1"...
[2017-08-31 11:38:49,983: WARNING/Worker-1] Processing item "queue1-2"...
[2017-08-31 11:38:51,989: WARNING/Worker-1] Processing item "queue2-2"...
[2017-08-31 11:38:53,998: WARNING/Worker-1] Processing item "queue1-3"...
[2017-08-31 11:38:56,007: WARNING/Worker-1] Processing item "queue2-3"...
[2017-08-31 11:38:58,013: WARNING/Worker-1] Processing item "queue1-4"...
[2017-08-31 11:39:00,024: WARNING/Worker-1] Processing item "queue2-4"...
[2017-08-31 11:39:02,032: WARNING/Worker-1] Processing item "queue1-5"...
[2017-08-31 11:39:04,041: WARNING/Worker-1] Processing item "queue2-5"...
[2017-08-31 11:39:06,050: WARNING/Worker-1] Processing item "queue1-6"...
[2017-08-31 11:39:08,060: WARNING/Worker-1] Processing item "queue2-6"...
[2017-08-31 11:39:10,070: WARNING/Worker-1] Processing item "queue1-7"...
[2017-08-31 11:39:12,077: WARNING/Worker-1] Processing item "queue2-7"...
[2017-08-31 11:39:14,086: WARNING/Worker-1] Processing item "queue1-8"...
[2017-08-31 11:39:16,094: WARNING/Worker-1] Processing item "queue2-8"...
[2017-08-31 11:39:18,103: WARNING/Worker-1] Processing item "queue1-9"...
[2017-08-31 11:39:20,110: WARNING/Worker-1] Processing item "queue2-9"...

#7 Updated by ttereshc 3 months ago

  • Priority changed from Normal to High
  • Sprint/Milestone set to Sprint 24
  • Severity changed from 2. Medium to 3. High
  • Triaged changed from No to Yes

#8 Updated by daviddavis@redhat.com 3 months ago

We discussed some other options to workaround this even if just temporarily, namely:

  • Have workers either work from the celery queue or from their reserved queues but not both. This would however require a major change to how tasks are assigned.
  • Have only some workers listen to the celery queue. However, tasks that get assigned to these workers' reserved queues would still be stuck.

#9 Updated by jortel@redhat.com 3 months ago

  • Sprint/Milestone changed from Sprint 24 to Sprint 25

#10 Updated by mhrivnak 2 months ago

Summarizing my understanding of the current state:

  • We determined that this is not a regression. It has been the behavior all along.
  • We determined that celery workers do not round-robin as we expected when watching multiple queues.
  • The goal at this point is to ensure that a flood of work in the "celery" queue does not halt progress on the reserved resource queues.
  • There is likely a use case for the opposite; we do not want a flood of reserved resource work to block all progress on the "celery" queue.

The common way to deal with different classifications of tasks is to have different workers dedicated to each. We started with one fleet of workers that can do everything, but even then we knew that some day we might need to get more specific about which task types go to which workers.

I think we could make this change without much effort:

  • Stop having the reserved resource workers listen on the general "celery" queue.
  • Make a single new worker that does listen on the celery queue and can auto-scale. [0]

[0] http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling

The auto-scaling feature has been in celery for a long time. As many of us know, each logical "worker" right now has a single parent process and a single child. The parent listens to the queue(s) and manages the child. The child does all of the actual work. (what cruel parenting!) But a single logical worker can have many child processes, and it can even auto-scale the number of children based on demand.

Pulp sets the "concurrency" [1] setting to 1 to guarantee that resource-reserving work is FIFO for any given worker. We do not have that requirement for work in the "celery" queue. Allowing a single logical worker to auto-scale with multiple children would keep the memory footprint relatively low but enable work on the "celery" queue to scale up quickly as needed.

[1] http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-c

We would need to:

  • Find where current workers decide which queues to watch and make them not watch the "celery" queue.
  • Make a new systemd unit file that starts a single worker with auto-scaling on and the default queue of "celery".
  • Make the max number of "celery" workers configurable somehow.

Are there any gotchas I'm not thinking about?

#11 Updated by mhrivnak 2 months ago

wrote:

  • Have workers either work from the celery queue or from their reserved queues but not both. This would however require a major change to how tasks are assigned.

How would this require changes to how tasks are assigned? When a task gets queued, a specific queue is chosen at that moment. Which workers happen to be listening to which queues should not be a concern for the code doing the queueing. In theory that's a nice benefit of separating producers from consumers.

#12 Updated by cduryee 2 months ago

this sounds reasonable to me. Would there be pulp_workers_reserved and pulp_workers_unreserved, or would it all be handled under pulp_workers?

#13 Updated by bmbouter 2 months ago

I think we should avoid having more specialized workers. It's harder to scale, deploy, etc. Here is an alternative:

We can adjust the celery implementation to dispatch round robin to the existing worker dedicated queues instead of the celery queue. This will prevent starvation or the different classes of work while also allowing the workers to remain generic. There are some edge cases around durability because the dedicated worker queues are non-durable, while the celery queue is durable. That should be managable.

What about ^ approach?

#14 Updated by cduryee 2 months ago

so there wouldn't be a 'celery' queue anymore, and all work would go to the dedicated queues?

ah i did not parse it correctly at first. Ya this sounds reasonable to me as well and doesn't involve config changes.

#15 Updated by mhrivnak 2 months ago

bmbouter wrote:

I think we should avoid having more specialized workers. It's harder to scale, deploy, etc. Here is an alternative:

We can adjust the celery implementation to dispatch round robin to the existing worker dedicated queues instead of the celery queue. This will prevent starvation or the different classes of work while also allowing the workers to remain generic. There are some edge cases around durability because the dedicated worker queues are non-durable, while the celery queue is durable. That should be managable.

What about ^ approach?

I like the simplicity. But I think it would leave us with the same problem that all work would get done in approximately the order in which it was queued. The user request is basically this:

Given that there are two categories of work:
1. Repo-locking work such as syncs and publishes.
2. Other background work like applicability calculation, background downloading, orphan cleanup, etc.

Regardless of how much work of one category is currently queued, if new work of the other category gets queued, progress should be made on both categories concurrently.

I think if we put all tasks into the worker-dedicated queues, we would get a similar behavior to today that tasks are completed in order regardless of which category they fall into.

#16 Updated by mhrivnak 2 months ago

cduryee wrote:

this sounds reasonable to me. Would there be pulp_workers_reserved and pulp_workers_unreserved, or would it all be handled under pulp_workers?

Something like that. We would need a single additional service, and we could brainstorm on the name. Maybe:

  • pulp_worker_unreserved
  • pulp_worker_general
  • pulp_worker_background

In theory we could even make this optional. That's another nice part of separating producers from consumers. For users who are happy with how things work now, they could continue with the current workers listening to both queues. Users who want the separation could enable this additional service and have the other workers not listen to the "celery" queue.

#17 Updated by daviddavis@redhat.com 2 months ago

Just some notes from a discussion today with @bmbouter, @beav, and others. It sounds like the user is experiencing an issue in which workers aren't performing any work. They have PULP_CONCURRENCY set to 16 but at times they see as few as 2 workers actually processing tasks so we think it is this bug:

https://pulp.plan.io/issues/2979

They're also on an instance of Katello which is known to be affected (PULP_MAX_TASKS_PER_CHILD is set to 2 in their particular release). We've recommended they first try disabling PULP_MAX_TASKS_PER_CHILD to see if that helps frees up workers to do work and in turn alleviates the congestion from regenerate applicability tasks.

If this alone fails to remedy their problem, we recommended increasing the number of workers from 16 to somewhere between 25-32 as their Satellite box has 128GB of memory. This should help to better process tasks. Another possibility would be to have the customer run a separate box that would run extra pulp workers (per https://docs.pulpproject.org/en/2.12/user-guide/scaling.html).

Longer term, we talked briefly about how to prioritize/categorize tasks but we thought that would be a rather complex problem especially trying to come up with a one-size-fits-all solution for all of our users. Instead, we decided to look at making it easier for Katello/Satellite users to scale their Pulp deployments thereby avoiding the problem of having to optimize how tasks are pulled from queues.

#18 Updated by daviddavis@redhat.com 2 months ago

  • Status changed from NEW to ASSIGNED
  • Assignee set to daviddavis@redhat.com

#19 Updated by daviddavis@redhat.com 2 months ago

Asked for updates but have not heard anything.

#21 Updated by mhrivnak about 2 months ago

  • Sprint/Milestone changed from Sprint 25 to Sprint 26

#22 Updated by daviddavis@redhat.com about 2 months ago

  • Status changed from ASSIGNED to CLOSED - NOTABUG

Talked this over with @bmbouter and decided to close this as a NOTABUG since user seems to no longer be experiencing this issue. We believe that fixing #2979 and configuring the environment appropriately should alleviate this issue. We can reopen and revisit this bug if that fails.

Please register to edit this issue

Also available in: Atom PDF