Project

Profile

Help

Issue #2979

Updated by daviddavis over 7 years ago

If the PULP_MAX_TASKS_PER_CHILD option in /etc/default/pulp_workers is uncommented, and a mongodb replica set is used, celery workers are prone to deadlock when handling tasks.    When this happens, all tasks assigned to the deadlocked worker will not be processed.    The parent celery process still participates in the worker heartbeat, so the system doesn't automatically recover from this. 

 In this scenario, the celery workers will log the warning: 

 <pre> 
 UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing> 
 </pre> 

 The problem seems to be exactly as mentioned on the linked page: "Instances of MongoClient copied from the parent process have a high probability of deadlock in the child process due to inherent incompatibilities between fork(), threads, and locks. PyMongo will attempt to issue a warning if there is a chance of this deadlock occurring." 

 I was able to reproduce this on the Pulp development environment in 2.14-dev ( bd1f496bd99175db45e585418327fc9834b8ee53 ) 

 Starting from a clean development environment, it could be reproduced by: 

 1. Adjust following config to use replica set and max tasks per child: 

 <pre> 
 --- /etc/mongod.conf.old 	 2017-08-15 00:44:15.092104023 +0000 
 +++ /etc/mongod.conf 	 2017-08-15 00:49:29.418767574 +0000 
 @@ -112,7 +112,8 @@ 
  #operationProfiling: 
 
  # replication Options - ReplSet settings 
 -#replication: 
 +replication: 
 +    replSetName: rs 
 
  # sharding Options - Shard settings 
  #sharding: 

 --- /etc/pulp/server.conf.old 	 2017-08-15 00:49:54.364820235 +0000 
 +++ /etc/pulp/server.conf 	 2017-08-15 00:50:02.265836914 +0000 
 @@ -50,7 +50,7 @@ 
  # seeds: localhost:27017 
  # username: 
  # password: 
 -# replica_set: 
 +replica_set: rs 
  # ssl: false 
  # ssl_keyfile: 
  # ssl_certfile: 


 --- /etc/default/pulp_workers.old 	 2017-08-15 01:23:41.052240219 +0000 
 +++ /etc/default/pulp_workers 	 2017-08-15 01:23:47.120262349 +0000 
 @@ -9,4 +9,4 @@ 
 
  # To avoid memory leaks, Pulp can terminate and replace a worker after processing X tasks. If 
  # left commented, process recycling is disabled. PULP_MAX_TASKS_PER_CHILD must be > 0. 
 -# PULP_MAX_TASKS_PER_CHILD=2 
 +PULP_MAX_TASKS_PER_CHILD=2 

 </pre> 

 2. Initialize mongo replica set - in mongo shell: 

 <pre> 
  rs.initiate({"_id":"rs",members:[{_id:0,host:"localhost:27017"}]}) 
 </pre> 

 3. Make a couple of timing changes to code so the issue is more easily reproducible 

 (It should be possible to reproduce without this, but may require being *very* patient) 

 <pre> 
 diff --git a/server/pulp/server/db/connection.py b/server/pulp/server/db/connection.py 
 index 9cf01e0..f38ad92 100644 
 --- a/server/pulp/server/db/connection.py 
 +++ b/server/pulp/server/db/connection.py 
 @@ -63,6 +63,9 @@ def initialize(name=None, seeds=None, max_pool_size=None, replica_set=None, max_ 
      try: 
          connection_kwargs = {} 
 
 +          # more frequent heartbeat makes race easier to reproduce 
 +          connection_kwargs['heartbeatfrequencyms'] = 1000 
 + 
          if name is None: 
              name = config.config.get('database', 'name') 


 --- /usr/lib64/python2.7/site-packages/pymongo/pool.py.old 	 2017-08-15 01:16:41.625710547 +0000 
 +++ /usr/lib64/python2.7/site-packages/pymongo/pool.py 	 2017-08-15 01:17:14.421830156 +0000 
 @@ -16,6 +16,7 @@ 
  import os 
  import socket 
  import threading 
 +import time 
 
  from bson import DEFAULT_CODEC_OPTIONS 
  from bson.py3compat import itervalues 
 @@ -608,6 +609,8 @@ 
                  # set.pop() isn't atomic in Jython less than 2.7, see 
                  # http://bugs.jython.org/issue1854 
                  with self.lock: 
 +                      # simulate unfavorable scheduling while holding lock 
 +                      time.sleep(.1) 
                      sock_info, from_pool = self.sockets.pop(), True 
              except KeyError: 
                  # Can raise ConnectionFailure or CertificateError. 
 </pre> 

 4. Restart mongo and pulp services 

 5. Start watching logs for completed tasks, e.g. 

 <pre> 
 journalctl -f | grep 'succeeded in' 
 </pre> 

 6. Flood celery worker with many tasks 

 e.g. I used this at the shell: 

 <pre> 
 enqueue(){ celery --app=pulp.server.async.app call --exchange=C.dq --routing-key=reserved_resource_worker-2@pulp2.dev pulp.server.async.tasks._release_resource '--args=["test"]'; } 
 while true; do for i in $(seq 1 5); do for j in $(seq 1 20); do enqueue & done; sleep 1; done; wait; done 
 </pre> 

 7. Observe logs 

 Watch the logs for some time (I needed about 1 hour to reproduce it most recently). 

 If messages about tasks succeeding stop appearing in logs, it may mean the deadlock has occurred. 

 When a worker has deadlocked like this then the following symptoms can be observed: 

 pstree looks like: 

 <pre> 
   |-celery,20139 /usr/bin/celery worker -n reserved_resource_worker-2@%h -A pulp.server.async.app -c 1 --events --umask 18 --pidfile=/v 
 ar/run/pulp/reserved_resource_worker-2.pid --maxtasksperchild=2 
   |     |-celery,13916 /usr/bin/celery worker -n reserved_resource_worker-2@%h -A pulp.server.async.app -c 1 --events --umask 18 --pidfile=/var/run/pulp/reserved_resource_worker-2.pid --maxtasksperchild=2 
   |     |     `-{celery},13917 
   |     |-{celery},20315 
   |     |-{celery},20316 
   |     `-{celery},20421 
 </pre> 

 strace looks like: 

 <pre> 
 [vagrant@pulp2 ~]$ sudo strace -p 13916 -f 
 strace: Process 13916 attached with 2 threads 
 [pid 13917] futex(0x7f89478f0bf0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, NULL, 0xffffffff <unfinished ...> 
 [pid 13916] futex(0x7f89478f0bf0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, NULL, 0xffffffff 
 </pre> 

 (threads waiting on a lock which will never be unlocked) 

 python backtraces (collected with gdb) look like: 

 <pre> 
 Thread 2 (Thread 0x7f8920910700 (LWP 13917)): 
 Traceback (most recent call first): 
   Waiting for the GIL 
   File "/usr/lib64/python2.7/site-packages/pymongo/topology.py", line 338, in update_pool 
     with self._lock: 
   File "/usr/lib64/python2.7/site-packages/pymongo/mongo_client.py", line 1075, in _process_periodic_tasks 
     self._topology.update_pool() 
   File "/usr/lib64/python2.7/site-packages/pymongo/mongo_client.py", line 406, in target 
     MongoClient._process_periodic_tasks(client) 
   File "/usr/lib64/python2.7/site-packages/pymongo/periodic_executor.py", line 93, in _run 
     if not self._target(): 
   File "/usr/lib64/python2.7/threading.py", line 757, in run 
     self.__target(*self.__args, **self.__kwargs) 
   File "/usr/lib64/python2.7/threading.py", line 804, in __bootstrap_inner 
     self.run() 
   File "/usr/lib64/python2.7/threading.py", line 777, in __bootstrap 
     self.__bootstrap_inner() 

 Thread 1 (Thread 0x7f8945632700 (LWP 13916)): 
 Traceback (most recent call first): 
   Waiting for the GIL 
   File "/usr/lib64/python2.7/site-packages/pymongo/topology.py", line 137, in open 
     with self._lock: 
   File "/usr/lib64/python2.7/site-packages/pymongo/mongo_client.py", line 757, in _get_topology 
     self._topology.open() 
   File "/usr/lib64/python2.7/site-packages/pymongo/mongo_client.py", line 819, in _send_message_with_response 
     topology = self._get_topology() 
   File "/usr/lib64/python2.7/site-packages/pymongo/cursor.py", line 850, in __send_message 
     **kwargs) 
   File "/usr/lib64/python2.7/site-packages/pymongo/cursor.py", line 1012, in _refresh 
     self.__read_concern)) 
   File "/usr/lib64/python2.7/site-packages/pymongo/cursor.py", line 1090, in next 
     if len(self.__data) or self._refresh(): 
   File "/usr/lib/python2.7/site-packages/mongoengine/queryset/base.py", line 1407, in next 
     raw_doc = self._cursor.next() 
   File "/home/vagrant/devel/pulp/server/pulp/server/async/tasks.py", line 289, in _release_resource 
     for running_task in running_task_qs: 
   File "/usr/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__ 
     return self.run(*args, **kwargs) 
   File "/home/vagrant/devel/pulp/server/pulp/server/async/tasks.py", line 107, in __call__ 
     return super(PulpTask, self).__call__(*args, **kwargs) 
   File "/usr/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task 
     R = retval = fun(*args, **kwargs) 
   File "/usr/lib/python2.7/site-packages/celery/app/trace.py", line 349, in _fast_trace_task 
     return _tasks[task].__trace__(uuid, args, kwargs, request)[0] 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 367, in workloop 
     result = (True, prepare_result(fun(*args, **kwargs))) 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 295, in run 
     sys.exit(self.workloop(pid=pid)) 
   File "/usr/lib64/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap 
     self.run() 
   File "/usr/lib64/python2.7/site-packages/billiard/forking.py", line 105, in __init__ 
     code = process_obj._bootstrap() 
   File "/usr/lib64/python2.7/site-packages/billiard/process.py", line 137, in start 
     self._popen = Popen(self) 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 1068, in _create_worker_process 
     w.start() 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 1237, in _repopulate_pool 
     self._create_worker_process(self._avail_index()) 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 1252, in _maintain_pool 
     self._repopulate_pool(joined) 
   File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 1260, in maintain_pool 
     self._maintain_pool() 
   File "/usr/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 415, in _event_process_exit 
     self.maintain_pool() 
   File "/usr/lib/python2.7/site-packages/kombu/async/hub.py", line 340, in create_loop 
     cb(*cbargs) 
   File "/usr/lib/python2.7/site-packages/celery/worker/loops.py", line 76, in asynloop 
     next(loop) 
   File "/usr/lib/python2.7/site-packages/celery/worker/consumer.py", line 838, in start 
     c.loop(*c.loop_args()) 
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start 
     step.start(parent) 
   File "/usr/lib/python2.7/site-packages/celery/worker/consumer.py", line 279, in start 
     blueprint.start(self) 
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start 
     return self.obj.start() 
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start 
     step.start(parent) 
   File "/usr/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start 
     self.blueprint.start(self) 
   File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line 212, in run 
     state_db=self.node_format(state_db, hostname), **kwargs 
   File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 274, in __call__ 
     ret = self.run(*args, **kwargs) 
   File "/usr/lib/python2.7/site-packages/celery/bin/worker.py", line 179, in run_from_argv 
     return self(*args, **options) 
   File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 694, in execute 
     ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) 
   File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 762, in handle_argv 
     return self.execute(command, argv) 
   File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandline 
     return self.handle_argv(self.prog_name, argv[1:]) 
   File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 770, in execute_from_commandline 
     super(CeleryCommand, self).execute_from_commandline(argv))) 
   File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main 
     cmd.execute_from_commandline(argv) 
   File "/usr/lib/python2.7/site-packages/celery/__main__.py", line 30, in main 
     main() 
   File "/usr/bin/celery", line 9, in <module> 
     load_entry_point('celery', 'console_scripts', 'celery')() 
 </pre> 


Back