Project

Profile

Help

Issue #1580 ยป 1580.patch

warning suppression context manager - semyers, 02/10/2016 08:45 PM

View differences:

server/pulp/server/async/tasks.py
20 20
    DEDICATED_QUEUE_EXCHANGE
21 21
from pulp.server.exceptions import PulpException, MissingResource, \
22 22
    PulpCodedException
23
from pulp.server.db.connection import suppress_connect_warning
23 24
from pulp.server.db.model import Worker, ReservedResource, TaskStatus
24 25
from pulp.server.exceptions import NoWorkers
25 26
from pulp.server.managers.repo import _common as common_utils
26 27
from pulp.server.managers import factory as managers
27 28
from pulp.server.managers.schedule import utils
28 29

  
29

  
30 30
controller = control.Control(app=celery)
31 31
_logger = logging.getLogger(__name__)
32 32

  
......
100 100
        """
101 101
        args = self._type_transform(args)
102 102
        kwargs = self._type_transform(kwargs)
103
        return super(PulpTask, self).__call__(*args, **kwargs)
103
        with suppress_connect_warning(_logger):
104
            return super(PulpTask, self).__call__(*args, **kwargs)
104 105

  
105 106

  
106 107
@task(base=PulpTask, acks_late=True)
......
450 451
        This overrides PulpTask's __call__() method. We use this method
451 452
        for task state tracking of Pulp tasks.
452 453
        """
453
        # Check task status and skip running the task if task state is 'canceled'.
454
        try:
455
            task_status = TaskStatus.objects.get(task_id=self.request.id)
456
        except DoesNotExist:
457
            task_status = None
458
        if task_status and task_status['state'] == constants.CALL_CANCELED_STATE:
459
            _logger.debug("Task cancel received for task-id : [%s]" % self.request.id)
460
            return
461
        # Update start_time and set the task state to 'running' for asynchronous tasks.
462
        # Skip updating status for eagerly executed tasks, since we don't want to track
463
        # synchronous tasks in our database.
464
        if not self.request.called_directly:
465
            now = datetime.now(dateutils.utc_tz())
466
            start_time = dateutils.format_iso8601_datetime(now)
467
            # Using 'upsert' to avoid a possible race condition described in the apply_async method
468
            # above.
469
            TaskStatus.objects(task_id=self.request.id).update_one(
470
                set__state=constants.CALL_RUNNING_STATE, set__start_time=start_time, upsert=True)
454
        with suppress_connect_warning(_logger):
455
            # Check task status and skip running the task if task state is 'canceled'.
456
            try:
457
                task_status = TaskStatus.objects.get(task_id=self.request.id)
458
            except DoesNotExist:
459
                task_status = None
460
            if task_status and task_status['state'] == constants.CALL_CANCELED_STATE:
461
                _logger.debug("Task cancel received for task-id : [%s]" % self.request.id)
462
                return
463
            # Update start_time and set the task state to 'running' for asynchronous tasks.
464
            # Skip updating status for eagerly executed tasks, since we don't want to track
465
            # synchronous tasks in our database.
466
            if not self.request.called_directly:
467
                now = datetime.now(dateutils.utc_tz())
468
                start_time = dateutils.format_iso8601_datetime(now)
469
                # Using 'upsert' to avoid a possible race condition described
470
                # in the apply_async method above.
471
                TaskStatus.objects(task_id=self.request.id).update_one(
472
                    set__state=constants.CALL_RUNNING_STATE, set__start_time=start_time,
473
                    upsert=True)
474

  
471 475
        # Run the actual task
472 476
        _logger.debug("Running task : [%s]" % self.request.id)
473 477
        return super(Task, self).__call__(*args, **kwargs)
......
491 495
                             % {'id': kwargs['scheduled_call_id']})
492 496
                utils.reset_failure_count(kwargs['scheduled_call_id'])
493 497
        if not self.request.called_directly:
494
            now = datetime.now(dateutils.utc_tz())
495
            finish_time = dateutils.format_iso8601_datetime(now)
496
            task_status = TaskStatus.objects.get(task_id=task_id)
497
            task_status['finish_time'] = finish_time
498
            task_status['result'] = retval
499

  
500
            # Only set the state to finished if it's not already in a complete state. This is
501
            # important for when the task has been canceled, so we don't move the task from canceled
502
            # to finished.
503
            if task_status['state'] not in constants.CALL_COMPLETE_STATES:
504
                task_status['state'] = constants.CALL_FINISHED_STATE
505
            if isinstance(retval, TaskResult):
506
                task_status['result'] = retval.return_value
507
                if retval.error:
508
                    task_status['error'] = retval.error.to_dict()
509
                if retval.spawned_tasks:
510
                    task_list = []
511
                    for spawned_task in retval.spawned_tasks:
512
                        if isinstance(spawned_task, AsyncResult):
513
                            task_list.append(spawned_task.task_id)
514
                        elif isinstance(spawned_task, dict):
515
                            task_list.append(spawned_task['task_id'])
516
                    task_status['spawned_tasks'] = task_list
517
            if isinstance(retval, AsyncResult):
518
                task_status['spawned_tasks'] = [retval.task_id, ]
519
                task_status['result'] = None
520

  
521
            task_status.save()
498
            with suppress_connect_warning(_logger):
499
                now = datetime.now(dateutils.utc_tz())
500
                finish_time = dateutils.format_iso8601_datetime(now)
501
                task_status = TaskStatus.objects.get(task_id=task_id)
502
                task_status['finish_time'] = finish_time
503
                task_status['result'] = retval
504

  
505
                # Only set the state to finished if it's not already in a complete state.
506
                # This is important for when the task has been canceled, so we don't move
507
                # the task from canceled to finished.
508
                if task_status['state'] not in constants.CALL_COMPLETE_STATES:
509
                    task_status['state'] = constants.CALL_FINISHED_STATE
510
                if isinstance(retval, TaskResult):
511
                    task_status['result'] = retval.return_value
512
                    if retval.error:
513
                        task_status['error'] = retval.error.to_dict()
514
                    if retval.spawned_tasks:
515
                        task_list = []
516
                        for spawned_task in retval.spawned_tasks:
517
                            if isinstance(spawned_task, AsyncResult):
518
                                task_list.append(spawned_task.task_id)
519
                            elif isinstance(spawned_task, dict):
520
                                task_list.append(spawned_task['task_id'])
521
                        task_status['spawned_tasks'] = task_list
522
                if isinstance(retval, AsyncResult):
523
                    task_status['spawned_tasks'] = [retval.task_id, ]
524
                    task_status['result'] = None
525

  
526
                task_status.save()
522 527
            common_utils.delete_working_directory()
523 528

  
524 529
    def on_failure(self, exc, task_id, args, kwargs, einfo):
server/pulp/server/db/connection.py
5 5
import logging
6 6
import ssl
7 7
import time
8
import warnings
9
from contextlib import contextmanager
8 10
from gettext import gettext as _
9 11

  
10 12
import mongoengine
13
import semantic_version
11 14
from pymongo.collection import Collection
12 15
from pymongo.errors import AutoReconnect, OperationFailure
13 16
from pymongo.son_manipulator import NamespaceInjector
14 17

  
15 18
from pulp.common import error_codes
16

  
17 19
from pulp.server import config
18 20
from pulp.server.compat import wraps
19 21
from pulp.server.exceptions import PulpCodedException, PulpException
20 22

  
21
import semantic_version
22

  
23 23

  
24 24
_CONNECTION = None
25 25
_DATABASE = None
......
168 168
        raise
169 169

  
170 170

  
171
@contextmanager
172
def suppress_connect_warning(logger):
173
    """
174
    A context manager that will suppress pymongo's connect before fork warning
175

  
176
    python's warnings module gives you a way to filter warnings (warnings.filterwarnings),
177
    and a way to catch warnings (warnings.catch_warnings), but not a way to do both. This
178
    context manager filters out the specific python warning about connecting before fork,
179
    while allowing all other warnings to normally be issued, so they aren't covered up
180
    by this context manager.
181

  
182
    The behavior seen here is based on the warnings.catch_warnings context manager, which
183
    also works by stashing the original showwarnings function and replacing it with a custom
184
    function while the context is entered.
185

  
186
    Outright replacement of functions in the warnings module is recommended by that module.
187

  
188
    The logger from the calling module is used to help identify which call to
189
    this context manager suppressed the pymongo warning.
190

  
191
    :param logger: logger from the module using this context manager
192
    :type logger: logging.Logger
193
    """
194
    try:
195
        warning_func_name = warnings.showwarning.func_name
196
    except AttributeError:
197
        warning_func_name = None
198

  
199
    # if the current showwarning func isn't already pymongo_suppressing_showwarning, replace it.
200
    # checking this makes this context manager reentrant with itself, since it won't replace
201
    # showwarning functions already replaced by this CM, but will replace all others
202
    if warning_func_name != 'pymongo_suppressing_showwarning':
203
        original_showwarning = warnings.showwarning
204

  
205
        # this is effectively a functools.partial used to generate a version of the warning catcher
206
        # using the passed-in logger and original showwarning function, but the logging module
207
        # rudely checks the type before calling this, and does not accept partials
208
        def pymongo_suppressing_showwarning(*args, **kwargs):
209
            return _pymongo_warning_catcher(logger, original_showwarning, *args, **kwargs)
210

  
211
        try:
212
            # replace warnings.showwarning with our pymongo warning catcher,
213
            # using the passed-in logger and the current showwarning function
214
            warnings.showwarning = pymongo_suppressing_showwarning
215
            yield
216
        finally:
217
            # whatever happens, restore the original showwarning function
218
            warnings.showwarning = original_showwarning
219
    else:
220
        # showwarning already replaced outside this context manager, nothing to do
221
        yield
222

  
223

  
224
def _pymongo_warning_catcher(logger, showwarning, message, category, *args, **kwargs):
225
    """
226
    An implementation of warnings.showwarning that supresses pymongo's connect before work warning
227

  
228
    This is intended to be wrapped with functools.partial by the mechanism that replaces
229
    the warnings.showwarnings function, with the first two args being a list in which to store
230
    the caught pymongo warning(s), and the second being the original warnings.showwarnings
231
    function, through which all other warnings will be passed.
232

  
233
    :param caught: list to be populated with caught warnings for inspection in the caller
234
    :type caught: list
235
    :param showwarning: The "real" warnings.showwarning function, for passing unrelated warnings
236
    :type showwarning: types.FunctionType
237

  
238
    All remaining args are the same as warnings.showwarning, and are only used here for filtering
239
    """
240
    message_expected = 'MongoClient opened before fork'
241
    # message is an instance of category, which becomes the warning message when cast as str
242
    if category is UserWarning and message_expected in str(message):
243
        # warning is pymongo connect before fork warning, log it...
244
        logger.debug('pymongo reported connection before fork, ignoring')
245
        # ...and filter it out for the rest of this process's lifetime
246
        warnings.filterwarnings('ignore', message_expected)
247
    else:
248
        # not interested in this warning, run it through the provided showwarning function
249
        showwarning(message, category, *args, **kwargs)
250

  
251

  
171 252
def _connect_to_one_of_seeds(connection_kwargs, seeds_list, db_name):
172 253
    """
173 254
    Helper function to iterate over a list of database seeds till a successful connection is made
server/test/unit/server/db/test_connection.py
1 1
import unittest
2
import warnings
2 3

  
3 4
from mock import call, patch, MagicMock, Mock
4 5
from pymongo.errors import AutoReconnect
......
908 909
        final_answer = mock_func()
909 910
        m_logger.error.assert_called_once_with('mock_func operation failed on mock_coll')
910 911
        self.assertTrue(final_answer is 'final')
912

  
913

  
914
@patch('warnings.showwarning', autospec=True)
915
class TestSuppressBeforeForkWarning(unittest.TestCase):
916
    def test_warning_suppressed(self, showwarning):
917
        logger = Mock()
918

  
919
        with connection.suppress_connect_warning(logger):
920
            self.assertTrue(warnings.showwarning is not warnings._show_warning)
921
            # The string to match is in this warning...
922
            warnings.warn('MongoClient opened before fork mock warning')
923
            # ...but not this warning
924
            warnings.warn('Mock warning unrelated to MongoClient')
925

  
926
        # two warnings were raised in-context: one should emit a debug log message,
927
        # the other should have called showwarning as-normal
928
        self.assertEqual(logger.debug.call_count, 1)
929
        self.assertEqual(showwarning.call_count, 1)
930

  
931
    def test_warning_restored(self, showwarning):
932
        logger = Mock()
933

  
934
        with connection.suppress_connect_warning(logger):
935
            # inside the context, warnings.showwarning has been replaced
936
            self.assertTrue(warnings.showwarning is not showwarning)
937

  
938
        # upon leaving the context, warnings.showwarning is restored
939
        self.assertTrue(warnings.showwarning is showwarning)
940

  
941
    def test_warning_restored_after_exception(self, showwarning):
942
        logger = Mock()
943
        showwarning.side_effect = Exception('Oh no!')
944

  
945
        with connection.suppress_connect_warning(logger):
946
            self.assertTrue(warnings.showwarning is not showwarning)
947
            self.assertRaises(Exception, warnings.warn, 'This will explode.')
948

  
949
        # despite the exception warnings.showwarning is restored,
950
        # even if an exception was raised
951
        self.assertTrue(warnings.showwarning is showwarning)
952

  
953
    def test_reentrant(self, showwarning):
954
        logger = Mock()
955

  
956
        with connection.suppress_connect_warning(logger):
957
            self.assertTrue(warnings.showwarning is not showwarning)
958
            suppressing_showwarning = warnings.showwarning
959
            with connection.suppress_connect_warning(logger):
960
                self.assertTrue(warnings.showwarning is not showwarning)
961

  
962
                # showwarning should not be replaced in this inner context, so the version
963
                # seen in the outer context should still be the current version seen
964
                self.assertTrue(warnings.showwarning is suppressing_showwarning)
965

  
966
        # nesting suppress_connect_warning contexts does not implode the universe,
967
        # but does still restore showwarning
968
        self.assertTrue(warnings.showwarning is showwarning)
969

  
970
    def test_warnings_ignored(self, showwarning):
971
        logger = Mock()
972

  
973
        with connection.suppress_connect_warning(logger):
974
            warnings.warn('MongoClient opened before fork mock warning')
975
        self.assertEqual(showwarning.call_count, 0)
976

  
977
        # after catching and logging the connect before fork warning, future warnings should be
978
        # ignored. verify this first by snooping around in warnings.filters and checking that
979
        # the first filter (and therefore newest, based on warnings.filterwarnings behavior)
980
        # is the one added by the suppress_connect_warning context manager
981
        action, regex = warnings.filters[0][:2]
982
        self.assertEqual(action, 'ignore')
983
        self.assertEqual(regex.pattern, 'MongoClient opened before fork')
984

  
985
        # also very this by issuing a matching warning outside of the suppressing context,
986
        # and seeing that showwarning is not called
987
        warnings.warn('MongoClient opened before fork mock warning')
988
        self.assertEqual(showwarning.call_count, 0)
    (1-1/1)