Issue #2613
Updated by dalley almost 8 years ago
This issue was introduced by the addition of the Celery bootstep code enabling each worker to write their own heartbeats. https://github.com/pulp/pulp/pull/2922 This is not a Pulp-specific problem - it appears to occur even with the most simple uses of Celery bootstep / extensions. To reproduce in Pulp: <pre> 1. prestart and smoke test with zoo repo sync 2. sudo systemctl stop qpidd 3. wait 30 seconds 4. observe connection error messages in the logs 4. sudo systemctl start qpidd 5. observe that all processes recover except celerybeat are seen to go offline and no longer have workers records in the database, despite the fact that the which shows an exception regularly which reads: pulp.server.async.scheduler:ERROR: connection error messages have now stopped aborted 6. prestart sudo systemctl stop pulp_celerybeat 7. sudo systemctl start pulp_celerybeat 8. sync zoo repo successfully </pre> You can also test this with pulp smash by running: <pre> workon pulp-smash python3 -m unittest pulp_smash.tests.rpm.api_v2.test_broker.BrokerTestCase.test_broker_reconnect </pre> To reproduce this generically, save the following as a python file and run with the command ```celery worker -A <file_name>.app``` <pre> from celery import Celery from celery import bootsteps class Reproducer(bootsteps.StartStopStep): requires = ('celery.worker.components:Timer', ) def __init__(self, parent, **kwargs): # here we can prepare the Worker/Consumer object # in any way we want, set attribute defaults, and so on. print('{0!r} is in init'.format(parent)) def start(self, worker): self.timer_ref = worker.timer.call_repeatedly( 5, self.do_work, (worker, ), priority=10, ) def do_work(self, worker): print('{0!r} heartbeat'.format(worker)) def stop(self, parent): print('{0!r} is stopping'.format(parent)) def shutdown(self, parent): print('{0!r} is shutting down'.format(parent)) app = Celery(broker='qpid://') Celery(broker='amqp://') app.steps['worker'].add(Reproducer) </pre> The same symptoms occur when using the RabbitMQ broker instead of Qpid, but with different error messages.