| # This Source Code Form is subject to the terms of the Mozilla Public |
| # License, v. 2.0. If a copy of the MPL was not distributed with this |
| # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| |
| from .base import BaseHandler |
| |
| class BufferHandler(BaseHandler): |
| """Handler that maintains a circular buffer of messages based on the |
| size and actions specified by a user. |
| |
| :param inner: The underlying handler used to emit messages. |
| :param message_limit: The maximum number of messages to retain for |
| context. If None, the buffer will grow without limit. |
| :param buffered_actions: The set of actions to include in the buffer |
| rather than log directly. |
| """ |
| |
| def __init__(self, inner, message_limit=100, buffered_actions=None): |
| BaseHandler.__init__(self, inner) |
| self.inner = inner |
| self.message_limit = message_limit |
| if buffered_actions is None: |
| buffered_actions = ['log', 'test_status'] |
| self.buffered_actions = set(buffered_actions) |
| self._buffering = True |
| |
| if self.message_limit is not None: |
| self._buffer = [None] * self.message_limit |
| self._buffer_pos = 0 |
| else: |
| self._buffer = [] |
| |
| self.register_message_handlers("buffer", { |
| "on": self._enable_buffering, |
| "off": self._disable_buffering, |
| "flush": self._flush_buffered, |
| "clear": self._clear_buffer, |
| }) |
| |
| def __call__(self, data): |
| action = data['action'] |
| if 'bypass_mozlog_buffer' in data: |
| data.pop('bypass_mozlog_buffer') |
| self.inner(data) |
| return |
| if not self._buffering or action not in self.buffered_actions: |
| self.inner(data) |
| return |
| |
| self._add_message(data) |
| |
| def _add_message(self, data): |
| if self.message_limit is None: |
| self._buffer.append(data) |
| else: |
| self._buffer[self._buffer_pos] = data |
| self._buffer_pos = (self._buffer_pos + 1) % self.message_limit |
| |
| def _enable_buffering(self): |
| self._buffering = True |
| |
| def _disable_buffering(self): |
| self._buffering = False |
| |
| def _clear_buffer(self): |
| """Clear the buffer of unwanted messages.""" |
| current_size = len([m for m in self._buffer if m is not None]) |
| if self.message_limit is not None: |
| self._buffer = [None] * self.message_limit |
| else: |
| self._buffer = [] |
| return current_size |
| |
| def _flush_buffered(self): |
| """Logs the contents of the current buffer""" |
| for msg in self._buffer[self._buffer_pos:]: |
| if msg is not None: |
| self.inner(msg) |
| for msg in self._buffer[:self._buffer_pos]: |
| if msg is not None: |
| self.inner(msg) |
| return self._clear_buffer() |