The util module#
Various utility classes and functions.
This module only depends on the Python standard library.
- class Dispatcher(default_func=None)[source]#
Bases:
object
Dispatches calls via an instance to methods based on the first argument.
A Dispatcher is used as a decorator when defining a class, and then called via or in an instance to select a method based on the first argument (which must be hashable).
If you override methods in a subclass that were dispatched, the dispatcher automatically dispatches to the new method. If you want to add new keywords in a subclass, just create a new Dispatcher with the same name. It will automatically inherit the references stored in the old dispatcher.
Usage:
class MyClass: dispatch = Dispatcher() @dispatch(1) def call_one(self, value): print("One called", value) @dispatch(2) def call_two(self, value): print("Two called", value) def handle_input(self, number, value): self.dispatch(number, value) >>> i = MyClass() >>> i.handle_input(2, 3) Two called 3
Values of the first argument that are not handled are normally silently ignored, but you can also specify a default function, that is called when a value is not handled:
class MyClass: @Dispatcher def dispatch(self, number, value): print("Default function called:", number, value) @dispatch(1) def call_one(self, value): print("One called", value) @dispatch(2) def call_two(self, value): print("Two called", value) def handle_input(self, number, value): self.dispatch(number, value) >>> i = MyClass() >>> i.handle_input(3, 10) Default function called: 3 10
To get the method for a key without calling it directly, e.g. to see of a method exists for a key, use:
>>> meth = i.dispatch.get(1) # returns a bound method >>> if meth: ... meth("hi there") ... One called hi there
If you specified a default method on creation of the dispatcher, that method is also accessible, in the
default
attribute:>>> i.dispatch.default(1, 2) Default function called: 1 2
- class Observable[source]#
Bases:
object
Simple base class for objects that need to announce events.
Use
connect()
to add a callable to be called when a certain event occurs.To announce an event from inside methods, use
emit()
. In your documentation you should specify which arguments are used for which events; in order to keep this class simple and fast, no checking is performed whatsoever.Example:
>>> o = Observable() >>> >>> def slot(arg): ... print("slot called:", arg) ... >>> o.connect('test', slot) >>> >>> o.emit('test', 1) # in a method of your Observable subclass slot called: 1
Is is also possible to use
emit()
in a with context. In that case the return values of the connected functions are collected and if they are a context manager, they are entered as well. An example:>>> import contextlib >>> >>> @contextlib.contextmanager ... def f(): ... print("one") ... yield ... print("two") ... >>> o=Observable() >>> o.connect('test', f) >>> >>> with o.emit('test'): ... print("Yo!!!") ... one Yo!!! two
This enables you to announce events, and connected objects can perform a task before the event’s context starts and another task when the event’s context exits.
- connect(event, func, once=False, prepend_self=False, priority=0)[source]#
Register a function to be called when a certain event occurs.
The
event
should be a string or any hashable object that identifies the event. Thepriority
determines the order the functions are called. Lower numbers are called first. Ifonce
is set to True, the function is called once and then removed from the list of callbacks. Ifprepend_self
is True, the callback is called with the observable itself as first argument.If the
func
is a method, it is stored using a weak reference.
- disconnect_all(event=None)[source]#
Disconnect all functions (from the event).
If event is None, disconnects all connected functions from all events.
- has_connections(event)[source]#
Return True when there is at least one callback registered for the event.
This can be used before performing some task, the task maybe then can be optimized because we know nobody needs the events.
- emit(event, *args, **kwargs)[source]#
Call all callbacks for the event.
Returns a
contextlib.ExitStack
instance. When any of the connected callbacks returns a context manager, that context is entered, and added to the exit stack, so it is exited when the exit stack is exited.
- class Switch[source]#
Bases:
object
A context manager that evaluates to True when in a context, else to False.
Example:
clicking = Switch() def myfunc(): with clicking: blablabl() # and elsewhere: def blablabl(): if not clicking: do_something() # when blablabl() is called from myfunc, clicking evaluates to True, # so do_something() is not called then.
A Switch can also be used in a class definition; via the descriptor protocol it will then create per-instance Switch objects which will be stored using a weak reference to the instance. For example:
class MyClass: clicking = Switch() def click_event(self, event): with self.clicking: self.blablabla() def blablabla(self): do_something() if not self.clicking: # this only runs when blablabla() was not called # from click_event() update_something()
- object_locker()[source]#
Return a callable that can hold a lock on an object.
The Lock is automatically created when requested for the first time, and deleted when released for the last time. Keeps a reference to the object until the last lock is released.
Usage example:
>>> lock = object_locker() >>> with lock(obj): ... do_something()
The lock callable should remain alive as long as the object is alive, so it is reused; it is the context where the locking is active. This function is an alternative to:
>>> class Object: ... def __init__(self): ... self._lock = threading.Lock() ... >>> o = Object()
and then later:
>>> with o._lock: ... do_something()
In this use case the allocated Lock lives as long as the object, which might not be desirable if you have a large amount of objects of this type.
- cached_method(func)[source]#
Wrap a method and caches its return value.
The method argument tuple should be hashable. Keyword arguments are not supported. The cache is thread-safe. Does not keep a reference to the instance.
- cached_func(func)[source]#
Wrap a normal function and caches the return value.
The function’s argument tuple should be hashable; keyword arguments are not supported. The cache is thread-safe.
- caching_dict(func, unpack=False, cache_none=True)[source]#
Create a dict with a thread-safe factory function for missing keys.
When a key is not present, the factory function is called. The difference with
collections.defaultdict
is that the factory function is called with the key as argument, or, ifunpack
is set to True, with the key arguments unpacked. Built-in locking makes sure another thread cannot call the factory function at the same time.If
cache_none
is set to False and the function returns None, that result is not cached, meaning that the function is run again on the next request.
- file_cache(func)[source]#
Return a dict that caches the factory function results.
The function should accept one argument which is assumed to be a filename. The result value is cached, but not returned anymore when the mtime of the file has changed; in that case the function is called again. If the mtime of the file can’t be determined the function result is not cached.
- class Symbol(name)[source]#
Bases:
object
An unique object that has a name; the same name returns the same object.
- fix_boundaries(stream, start, end)[source]#
Yield all items from the stream of tuples.
The first two items of each tuple are regarded as pos and end. This function adjusts the pos of the first item and the end of the last item so that they do not stick out of the range start..end. If the pos of the first item is below start, it is set to start; if the end of the last item is beyond end, it is set to end.
If start == 0, the first item will never be adjusted; if end is None, the last item will not be adjusted.
- merge_adjacent(stream, factory=<class 'tuple'>)[source]#
Yield items from a stream of tuples.
The first two items of each tuple are regarded as pos and end. If they are adjacent, and the rest of the tuples compares the same, the items are merged.
Instead of the default factory tuple, you can give a named tuple or any other type to wrap the streams items in.
- merge_adjacent_actions(tokens)[source]#
Yield three-tuples (pos, end, action).
Adjacent actions that are the same are merged into one range.
- merge_adjacent_actions_with_language(tokens)[source]#
Yield four-tuples (pos, end, action, language).
Adjacent actions that are the same and occurred in the same language are merged into one range.
- get_bom_encoding(data)[source]#
Get the BOM (Byte Order Mark) of bytes
data
, if any.A two-tuple is returned (encoding, data). If the data starts with a BOM mark, its encoding is determined and the BOM mark is stripped off. Otherwise, the returned encoding is None and the data is returned unchanged.
- split_list(l, separator)[source]#
Split list on items that compare equal to separator.
Yields result lists that may be empty.
- unroll(obj)[source]#
Unroll a tuple or list.
If the object is a tuple or list, yields the unrolled members recursively. Otherwise just the object itself is yielded.
- tokens(nodes, reverse=False)[source]#
Helper to yield tokens from the iterable of nodes.
If
reverse
is set to True, yields the tokens of the nodes in backward direction.
- language_sister_class(language, template, base, try_parents=False)[source]#
Find a
language
sister class in the same module, with a name that matches thetemplate
, and which is a subclass ofbase
.If
try_parents
is True, the parent classes of the language class are checked if a sister class is not found.The template string should contain
{}
, which is replaced with the language’s class name. Returns None if no sister class is defined. Example:>>> from parce.util import language_sister_class >>> from parce.lang.json import Json >>> from parce.transform import Transform >>> language_sister_class(Json, "{}Transform", Transform) <class 'parce.lang.json.JsonTransform'>