# -*- coding: utf-8 -*-
#
# This file is part of the parce Python package.
#
# Copyright © 2019-2020 by Wilbert Berendsen <info@wilbertberendsen.nl>
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Various utility classes and functions.
This module only depends on the Python standard library.
"""
import bisect
import codecs
import contextlib
import functools
import os.path
import sys
import threading
import types
import weakref
[docs]class Dispatcher:
"""Dispatches calls via an instance to methods based on the first argument.
A Dispatcher is used as a decorator when defining a class, and then called
via or in an instance to select a method based on the first argument (which
must be hashable).
If you override methods in a subclass that were dispatched, the
dispatcher automatically dispatches to the new method. If you want to add
new keywords in a subclass, just create a new Dispatcher with the same
name. It will automatically inherit the references stored in the old
dispatcher.
Usage::
class MyClass:
dispatch = Dispatcher()
@dispatch(1)
def call_one(self, value):
print("One called", value)
@dispatch(2)
def call_two(self, value):
print("Two called", value)
def handle_input(self, number, value):
self.dispatch(number, value)
>>> i = MyClass()
>>> i.handle_input(2, 3)
Two called 3
Values of the first argument that are not handled are normally silently
ignored, but you can also specify a default function, that is called when
a value is not handled::
class MyClass:
@Dispatcher
def dispatch(self, number, value):
print("Default function called:", number, value)
@dispatch(1)
def call_one(self, value):
print("One called", value)
@dispatch(2)
def call_two(self, value):
print("Two called", value)
def handle_input(self, number, value):
self.dispatch(number, value)
>>> i = MyClass()
>>> i.handle_input(3, 10)
Default function called: 3 10
To get the method for a key without calling it directly, e.g. to see of
a method exists for a key, use::
>>> meth = i.dispatch.get(1) # returns a bound method
>>> if meth:
... meth("hi there")
...
One called hi there
If you specified a default method on creation of the dispatcher, that
method is also accessible, in the ``default`` attribute::
>>> i.dispatch.default(1, 2)
Default function called: 1 2
"""
def __init__(self, default_func=None):
self._lock = threading.Lock()
self._table = {}
self._tables = weakref.WeakKeyDictionary()
self._default_func = default_func
def __set_name__(self, owner, name):
self._name = name
def __call__(self, *args):
def decorator(func):
for a in args:
self._table[a] = func.__name__
return func
return decorator
def __get__(self, instance, owner):
try:
table = self._tables[owner]
except KeyError:
with self._lock:
try:
table = self._tables[owner]
except KeyError:
# find Dispatchers in base classes with the same name
# if found, inherit their references
dispatchers = []
for c in owner.mro():
d = c.__dict__.get(self._name)
if type(d) is type(self):
dispatchers.append(d)
_table = {}
for d in reversed(dispatchers):
_table.update(d._table)
# now, store the actual functions instead of their names
table = self._tables[owner] = {a: getattr(owner, name)
for a, name in _table.items()}
return _Dispatcher(self, table, instance, owner)
class _Dispatcher:
"""Helper class for Dispatcher."""
__slots__ = ("_dispatcher", "_table", "_instance", "_owner")
def __init__(self, dispatcher, table, instance, owner):
self._dispatcher = dispatcher
self._table = table
self._instance = instance
self._owner = owner
def __repr__(self):
return "<{}.{} {}.{} of {}>".format(
self._dispatcher.__class__.__module__,
self._dispatcher.__class__.__name__,
self._owner.__name__,
self._dispatcher._name,
repr(self._instance))
def __call__(self, key, *args, **kwargs):
"""Call the stored method based on the key (first argument) with the
other arguments."""
f = self._table.get(key)
if f:
return f(self._instance, *args, **kwargs)
f = self.default
if f:
return f(key, *args, **kwargs)
@property
def default(self):
"""The bound method specified as default, if any."""
f = self._dispatcher._default_func
if f:
return f.__get__(self._instance, self._owner)
def get(self, key):
"""Return the bound method for the key, without calling it."""
f = self._table.get(key)
if f:
return f.__get__(self._instance, self._owner)
class _Observer:
"""Helper for Observable class.
The magic lt/gt methods are to help with sorting on priority and the eq/ne
methods to see if the function already is added to the list of slots.
"""
__slots__ = ('func', 'once', 'priority', 'call')
def __init__(self, func, once=None, prepend_self=False, priority=0):
if isinstance(func, types.MethodType):
func = weakref.WeakMethod(func)
self.call = self.call_weakmethod_with_self if prepend_self else self.call_weakmethod
else:
self.call = func if prepend_self else self.call_func
self.func = func
self.once = once
self.priority = priority
def __repr__(self):
return "<Observer for {}>".format(self.func)
def __eq__(self, other):
if type(other) is _Observer:
return self.func == other.func
return NotImplemented
def __ne__(self, other):
if type(other) is _Observer:
return self.func != other.func
return NotImplemented
def __lt__(self, other):
if type(other) is _Observer:
return self.priority < other.priority
return NotImplemented
def __gt__(self, other):
if type(other) is _Observer:
return self.priority > other.priority
return NotImplemented
def call_func(self, observable, *args, **kwargs):
return self.func(*args, **kwargs)
def call_weakmethod(self, observable, *args, **kwargs):
func = self.func()
if func:
return func(*args, **kwargs)
self.once = True
def call_weakmethod_with_self(self, observable, *args, **kwargs):
func = self.func()
if func:
return func(observable, *args, **kwargs)
self.once = True
[docs]class Observable:
"""Simple base class for objects that need to announce events.
Use :meth:`connect` to add a callable to be called when a certain event
occurs.
To announce an event from inside methods, use :meth:`emit`. In your
documentation you should specify *which* arguments are used for *which*
events; in order to keep this class simple and fast, no checking is
performed whatsoever.
Example::
>>> o = Observable()
>>>
>>> def slot(arg):
... print("slot called:", arg)
...
>>> o.connect('test', slot)
>>>
>>> o.emit('test', 1) # in a method of your Observable subclass
slot called: 1
Is is also possible to use :meth:`emit` in a :ref:`with <with>` context. In
that case the return values of the connected functions are collected and if
they are a context manager, they are entered as well. An example::
>>> import contextlib
>>>
>>> @contextlib.contextmanager
... def f():
... print("one")
... yield
... print("two")
...
>>> o=Observable()
>>> o.connect('test', f)
>>>
>>> with o.emit('test'):
... print("Yo!!!")
...
one
Yo!!!
two
This enables you to announce events, and connected objects can perform a
task before the event's context starts and another task when the event's
context exits.
"""
def __init__(self):
self._callbacks = {}
[docs] def connect(self, event, func, once=False, prepend_self=False, priority=0):
"""Register a function to be called when a certain event occurs.
The ``event`` should be a string or any hashable object that identifies
the event. The ``priority`` determines the order the functions are
called. Lower numbers are called first. If ``once`` is set to True, the
function is called once and then removed from the list of callbacks. If
``prepend_self`` is True, the callback is called with the observable
itself as first argument.
If the ``func`` is a method, it is stored using a weak reference.
"""
observer = _Observer(func, once, prepend_self, priority)
slots = self._callbacks.setdefault(event, [])
if observer not in slots:
bisect.insort_right(slots, observer)
[docs] def disconnect(self, event, func):
"""Remove a previously registered callback function."""
try:
slots = self._callbacks[event]
except KeyError:
return
observer = _Observer(func)
try:
slots.remove(observer)
except ValueError:
return
if not slots:
del self._callbacks[event]
[docs] def disconnect_all(self, event=None):
"""Disconnect all functions (from the event).
If event is None, disconnects all connected functions from all events.
"""
if event is None:
self._callbacks.clear()
else:
try:
del self._callbacks[event]
except KeyError:
pass
[docs] def has_connections(self, event):
"""Return True when there is at least one callback registered for the event.
This can be used before performing some task, the task maybe then can
be optimized because we know nobody needs the events.
"""
return event in self._callbacks
[docs] def is_connected(self, event, func):
"""Return True if func is connected to event."""
try:
slots = self._callbacks[event]
except KeyError:
return False
return _Observer(func) in slots
[docs] def emit(self, event, *args, **kwargs):
"""Call all callbacks for the event.
Returns a :class:`contextlib.ExitStack` instance. When any of the
connected callbacks returns a context manager, that context is entered,
and added to the exit stack, so it is exited when the exit stack is
exited.
"""
s = contextlib.ExitStack()
try:
slots = self._callbacks[event]
except KeyError:
return s
disconnect = []
for i, observer in enumerate(slots):
result = observer.call(self, *args, **kwargs)
try:
s.enter_context(result)
except Exception:
pass
if observer.once:
disconnect.append(i)
if disconnect:
for i in reversed(disconnect):
del slots[i]
if not slots:
del self._callbacks[event]
return s
[docs]class Switch:
"""A context manager that evaluates to True when in a context, else to False.
Example::
clicking = Switch()
def myfunc():
with clicking:
blablabl()
# and elsewhere:
def blablabl():
if not clicking:
do_something()
# when blablabl() is called from myfunc, clicking evaluates to True,
# so do_something() is not called then.
A Switch can also be used in a class definition; via the descriptor
protocol it will then create per-instance Switch objects which will be
stored using a weak reference to the instance. For example::
class MyClass:
clicking = Switch()
def click_event(self, event):
with self.clicking:
self.blablabla()
def blablabla(self):
do_something()
if not self.clicking:
# this only runs when blablabla() was not called
# from click_event()
update_something()
"""
__slots__ = ('_value',)
def __init__(self):
self._value = 0
def __enter__(self):
self._value += 1
def __exit__(self, exc_type, exc_val, exc_tb):
self._value -= 1
def __bool__(self):
return bool(self._value)
def __get__(self, instance, owner):
try:
return self._value[instance]
except TypeError:
# value still was 0, replace it with a weakref dict
self._value = weakref.WeakKeyDictionary()
except KeyError:
pass
s = self._value[instance] = type(self)()
return s
[docs]def object_locker():
"""Return a callable that can hold a lock on an object.
The Lock is automatically created when requested for the first time, and
deleted when released for the last time. Keeps a reference to the object
until the last lock is released.
Usage example::
>>> lock = object_locker()
>>> with lock(obj):
... do_something()
The lock callable should remain alive as long as the object is alive, so it
is reused; it is the context where the locking is active. This function is
an alternative to::
>>> class Object:
... def __init__(self):
... self._lock = threading.Lock()
...
>>> o = Object()
and then later::
>>> with o._lock:
... do_something()
In this use case the allocated Lock lives as long as the object, which
might not be desirable if you have a large amount of objects of this type.
"""
locker = {}
locker_lock = threading.Lock()
def lock_object(obj):
with locker_lock:
try:
return locker[obj]
except KeyError:
lock = locker[obj] = threading.Lock()
@contextlib.contextmanager
def cleanup():
try:
with lock:
yield
finally:
del locker[obj]
return cleanup()
return lock_object
[docs]def cached_method(func):
"""Wrap a method and caches its return value.
The method argument tuple should be hashable. Keyword arguments are not
supported. The cache is thread-safe. Does not keep a reference to the
instance.
"""
lock = object_locker()
cache = weakref.WeakKeyDictionary()
@functools.wraps(func)
def wrapper(self, *args):
with lock(self):
try:
return cache[self][args]
except KeyError:
v = cache.setdefault(self, {})[args] = func(self, *args)
return v
return wrapper
[docs]def cached_property(func):
"""Like property, but caches the computed value."""
return property(cached_method(func))
[docs]def cached_func(func):
"""Wrap a normal function and caches the return value.
The function's argument tuple should be hashable; keyword arguments are not
supported. The cache is thread-safe.
"""
cache = caching_dict(func, True)
@functools.wraps(func)
def wrapper(*args):
return cache[args]
return wrapper
[docs]def caching_dict(func, unpack=False, cache_none=True):
"""Create a dict with a thread-safe factory function for missing keys.
When a key is not present, the factory function is called. The difference
with :class:`collections.defaultdict` is that the factory function is
called with the key as argument, or, if ``unpack`` is set to True, with the
key arguments unpacked. Built-in locking makes sure another thread cannot
call the factory function at the same time.
If ``cache_none`` is set to False and the function returns None, that
result is not cached, meaning that the function is run again on the next
request.
"""
lock = threading.Lock()
if unpack:
if cache_none:
def result(self, key):
value = self[key] = func(*key)
return value
else:
def result(self, key):
value = func(*key)
if value is not None:
self[key] = value
return value
elif cache_none:
def result(self, key):
value = self[key] = func(key)
return value
else:
def result(self, key):
value = func(key)
if value is not None:
self[key] = value
return value
class cache(dict):
def __getitem__(self, key):
with lock:
try:
return super().__getitem__(key)
except KeyError:
return result(self, key)
return cache()
[docs]def file_cache(func):
"""Return a dict that caches the factory function results.
The function should accept one argument which is assumed to be a filename.
The result value is cached, but not returned anymore when the mtime of the
file has changed; in that case the function is called again. If the mtime
of the file can't be determined the function result is not cached.
"""
lock = threading.Lock()
class filecache(dict):
def __getitem__(self, key):
with lock:
try:
mtime, value = super().__getitem__(key)
except KeyError:
mtime = -1
try:
new_mtime = os.path.getmtime(key)
except OSError:
new_mtime = -2
if mtime != new_mtime:
value = func(key)
if new_mtime >= 0:
self[key] = new_mtime, value
return value
return filecache()
[docs]class Symbol:
"""An unique object that has a name; the same name returns the same object."""
def __repr__(self):
return self._name
@cached_func
def __new__(cls, name):
obj = object.__new__(cls)
obj._name = name
return obj
[docs]def fix_boundaries(stream, start, end):
"""Yield all items from the stream of tuples.
The first two items of each tuple are regarded as pos and end. This
function adjusts the pos of the first item and the end of the last item so
that they do not stick out of the range start..end. If the pos of the first
item is below start, it is set to start; if the end of the last item is
beyond end, it is set to end.
If start == 0, the first item will never be adjusted; if end is None, the
last item will not be adjusted.
"""
if start == 0 and end is None:
yield from stream # do nothing
else:
stream = iter(stream)
for i in stream:
if i[0] < start:
i = type(i)((start, i[1], *i[2:]))
for j in stream:
yield i
i = j
if end is not None and i[1] > end:
i = type(i)((i[0], end, *i[2:]))
yield i
[docs]def merge_adjacent(stream, factory=tuple):
"""Yield items from a stream of tuples.
The first two items of each tuple are regarded as pos and end.
If they are adjacent, and the rest of the tuples compares the same,
the items are merged.
Instead of the default factory `tuple`, you can give a named tuple
or any other type to wrap the streams items in.
"""
stream = iter(stream)
for pos, end, *rest in stream:
for npos, nend, *nrest in stream:
if nrest != rest or npos > end:
yield factory(pos, end, *rest)
pos, rest = npos, nrest
end = nend
yield factory(pos, end, *rest)
[docs]def merge_adjacent_actions(tokens):
"""Yield three-tuples (pos, end, action).
Adjacent actions that are the same are merged into
one range.
"""
return merge_adjacent((t.pos, t.end, t.action) for t in tokens)
[docs]def merge_adjacent_actions_with_language(tokens):
"""Yield four-tuples (pos, end, action, language).
Adjacent actions that are the same and occurred in the same language
are merged into one range.
"""
return merge_adjacent((t.pos, t.end, t.action, t.parent.lexicon.language)
for t in tokens)
[docs]def get_bom_encoding(data):
"""Get the BOM (Byte Order Mark) of bytes ``data``, if any.
A two-tuple is returned (encoding, data). If the data starts with a BOM
mark, its encoding is determined and the BOM mark is stripped off.
Otherwise, the returned encoding is None and the data is returned
unchanged.
"""
for bom, encoding in (
(codecs.BOM_UTF8, 'utf-8'),
(codecs.BOM_UTF16_LE, 'utf_16_le'),
(codecs.BOM_UTF16_BE, 'utf_16_be'),
(codecs.BOM_UTF32_LE, 'utf_32_le'),
(codecs.BOM_UTF32_BE, 'utf_32_be'),
):
if data.startswith(bom):
return encoding, data[len(bom):]
return None, data
[docs]def split_list(l, separator):
"""Split list on items that compare equal to separator.
Yields result lists that may be empty.
"""
i = 0
try:
while True:
j = l.index(separator, i)
yield l[i:j]
i = j + 1
except ValueError:
yield l[i:]
[docs]def unroll(obj):
"""Unroll a tuple or list.
If the object is a tuple or list, yields the unrolled members recursively.
Otherwise just the object itself is yielded.
"""
if type(obj) in (tuple, list):
stack = []
gen = iter(obj)
while True:
for obj in gen:
if type(obj) in (tuple, list):
stack.append(gen)
gen = iter(obj)
break
else:
yield obj
else:
if stack:
gen = stack.pop()
else:
break
else:
yield obj
[docs]def tokens(nodes, reverse=False):
"""Helper to yield tokens from the iterable of nodes.
If ``reverse`` is set to True, yields the tokens of the nodes in backward
direction.
"""
if reverse:
nodes = reversed(nodes)
for n in nodes:
if n.is_token:
yield n
else:
yield from n.tokens(reverse)
[docs]def language_sister_class(language, template, base, try_parents=False):
"""Find a ``language`` sister class in the same module, with a name that
matches the ``template``, and which is a subclass of ``base``.
If ``try_parents`` is True, the parent classes of the language class are
checked if a sister class is not found.
The template string should contain ``{}``, which is replaced with the
language's class name. Returns None if no sister class is defined.
Example::
>>> from parce.util import language_sister_class
>>> from parce.lang.json import Json
>>> from parce.transform import Transform
>>> language_sister_class(Json, "{}Transform", Transform)
<class 'parce.lang.json.JsonTransform'>
"""
langs = language.mro()[:-2] if try_parents else [language]
for l in langs:
module = sys.modules[l.__module__]
name = template.format(l.__name__)
cls = getattr(module, name, None)
if isinstance(cls, type) and issubclass(cls, base):
return cls