Project

Profile

Help

Issue #2613

Updated by dalley about 7 years ago

This issue was introduced by only affects Pulp when used with the addition of the Celery bootstep code enabling each worker to write their own heartbeats. 

 https://github.com/pulp/pulp/pull/2922 

 This is not Celery4+Kombu4. For example in a Pulp-specific problem - it appears to occur even with Fedora26 environment or if you manually install the most simple uses of Celery bootstep / extensions. stack via pip. 

 To reproduce in Pulp: reproduce: 
 <pre> 
 1. prestart and smoke test with zoo repo sync 
 2. sudo systemctl stop qpidd 
 3. wait 30 seconds 
 4. sudo systemctl start qpidd 
 5. observe that all processes recover except celerybeat which shows an exception regularly which reads: pulp.server.async.scheduler:ERROR: connection aborted 
 6. sudo systemctl stop pulp_celerybeat 
 7. sudo systemctl start pulp_celerybeat 
 8. sync zoo repo successfully 
 </pre> 

 You can also test this with pulp smash by running: 

 <pre> 
 workon pulp-smash 
 python3 -m unittest pulp_smash.tests.rpm.api_v2.test_broker.BrokerTestCase.test_broker_reconnect 
 </pre> 

 To reproduce this generically, save the following as a python file and run with the command ```celery worker -A <file_name>.app``` 

 <pre> 
 from celery import Celery 
 from celery import bootsteps 

 class Reproducer(bootsteps.StartStopStep): 
     requires = ('celery.worker.components:Timer', ) 

     def __init__(self, parent, **kwargs): 
         # here we can prepare the Worker/Consumer object 
         # in any way we want, set attribute defaults, and so on. 
         print('{0!r} is in init'.format(parent)) 

     def start(self, worker): 
         self.timer_ref = worker.timer.call_repeatedly( 
             5, 
             self.do_work, 
             (worker, ), 
             priority=10, 
         ) 

     def do_work(self, worker): 
         print('{0!r} heartbeat'.format(worker)) 

     def stop(self, parent): 
         print('{0!r} is stopping'.format(parent)) 

     def shutdown(self, parent): 
         print('{0!r} is shutting down'.format(parent)) 

 app = Celery(broker='amqp://') 
 app.steps['worker'].add(Reproducer) 
 </pre>

Back