Source code for parce.treebuilder

# -*- coding: utf-8 -*-
#
# This file is part of the parce Python package.
#
# Copyright © 2019-2020 by Wilbert Berendsen <info@wilbertberendsen.nl>
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


"""
This module defines classes and functions to build a tree structure by parsing
a text string.

To get the tree of tokens using a particular root lexicon from a string
of text, use :func:`build_tree`.

A more advanced approach is using the :class:`TreeBuilder`, which can build a
tree in one go as well, but is also capable of updating an existing tree when
the text changes on a particular position, e.g. while typing in a text editor.
In this case, tokens in front of the modified region are reused (carefully
checking whether changes affect earlier regions), and also tokens at the end of
the modified region are reused, if they have the same context ancestry.

TreeBuilder also reports the start and end position of the updated region, and
the lexicons that were left open at the end, which in some languages can mean
that a document or a certain structure is incomplete.

The TreeBuilder is designed so that it is possible to perform tokenizing in a
background thread, and even interrupt tokenizing when changes are to be applied
while processing previous changes.

"""

import threading

from parce import util
from parce.lexer import Lexer
from parce.target import TargetFactory
from parce.tree import Context, make_tokens
from parce.treebuilderutil import (
    BuildResult, ReplaceResult, Changes, ancestors_with_index,
    get_prepared_lexer, new_tree)


[docs]def build_tree(root_lexicon, text, pos=0): """Build and return a tree in one go.""" from parce.tree import Context, make_tokens # local is faster root = context = Context(root_lexicon, None) if root_lexicon: lexer = Lexer([root_lexicon]) for target, lexemes in lexer.events(text, pos): if target: for _ in range(target.pop, 0): context = context.parent for lexicon in target.push: context = Context(lexicon, context) context.parent.append(context) context.extend(make_tokens(lexemes, context)) return root
[docs]class TreeBuilder(util.Observable): """Build a tree from parsing the text. The root node of the tree is in the ``root`` instance attribute. This root context is never replaced, although its lexicon may change and of course its children. Call :meth:`rebuild` to build or rebuild the tree. This method stores the desired changes to the tree and calls :meth:`start_processing`, which can be re-implemented to support asynchronous tree building. The actual building of a tree happens in :meth:`build_new_tree` which builds a (replacement) tree without making any changes yet to the current tree. The result of :meth:`build_new_tree` is a tuple of arguments that is used used when calling :meth:`replace_tree`, which integrates the updated subtree in the main tree structure. This method sets three instance attributes: ``start``, ``end``: indicate the region the tokens were changed. After build(), start is always 0 and end = len(text), but after rebuild(), these values indicate the range that was actually re-tokenized. ``lexicons``: the list of open lexicons (excluding the root lexicon) at the end of the document. This way you can see in which lexicon parsing ended. If a tree was rebuilt, and old tail tokens were reused, the lexicons variable is not set, meaning that the old value is still valid. If the TreeBuilder was not used before, lexicons is an empty tuple. No other variables or state are kept, so if you don't need the above information anymore, you can throw away the TreeBuilder after use. During the building process, the TreeBuilder emits certain events you can subscribe to, using the :meth:`~parce.util.Observable.connect` method provided by the :class:`~parce.util.Observable` class that's mixed into this TreeBuilder class. The following events are emitted, with following arguments: ``"started"``: emitted when a (re)build starts; the handler is called without arguments ``"replace"``: emitted just before the tree actually changes (while the new tree is being built, the tree is still unchanged and accessible, but between the ``"replace"`` and ``"finished"`` events the tree is in an inconsistent state) ``"finished"``: emitted when a (re)build has finished; the handler is called without arguments ``"updated"``: emitted when a (re)build has finished; the handler is called with two arguments: ``start``, ``end``, that denote the updated range ``"peek"``: emitted by the default implementation of the :meth:`peek` method, the handler is called with two arguments: ``start``, ``tree`` ``"invalidate"``: emitted by the default implementation of the :meth:`invalidate_context` method, the handler is called with the Context that needs to be invalidated For example, to get notified when a build process starts:: >>> b = TreeBuilder(MyLang.root) >>> def hi_there(): ... print("started") ... >>> b.connect("started", hi_there) >>> b.rebuild("some boring text") started >>> """ start = 0 end = 0 lexicons = () peek_threshold = 0 #: set to a value > 0 to get :meth:`peek` called during building def __init__(self, root_lexicon=None): super().__init__() self._lock = threading.Lock() self.root = Context(root_lexicon, None) self.busy = False self.changes = []
[docs] def tree(self, text): """Convenience method to build a tree and return the root node.""" self.rebuild(text) return self.root
[docs] def rebuild(self, text, root_lexicon=False, start=0, removed=0, added=None): """Tokenize the modified part of the text again and update the tree. The arguments: ``text`` The text to parse. Always give the entire text, also when you only actually changed a small part. The tree builder needs to check text before and after the changed region, and possibly re-parse more text. ``root_lexicon`` The root lexicon to use (default: False). False means no change; can be None or any Lexicon. If not False, the tree is always rebuilt completely. ``start`` Position of the change (default: 0) ``removed`` The number of removed characters (default: 0) ``added`` The number of added characters (default: None, which means all characters from start to the end of the text) Calls :meth:`build_new_tree` and :meth:`replace_tree` to do the actual work. """ self.add_changes(text, root_lexicon, start, removed, added) if not self.busy: self.busy = True self.start_processing()
[docs] def add_changes(self, text, root_lexicon, start, removed, added): """Add the changes to our changes list, but do not rebuild immediately. The arguments are the same as for :meth:`rebuild`. """ if added is None: added = len(text) - start with self._lock: self.changes.append((text, root_lexicon, start, removed, added))
[docs] def build_new_tree(self, text, root_lexicon, start, removed, added): """Build a new tree without yet modifying the current tree. This method is called by :meth:`process`. Returns a ``BuildResult`` five-tuple with ``tree``, ``start``, ``end``, ``offset`` and ``lexicons`` values. The ``start`` and ``end`` are the insert positions in the old tree. Tokens from the current tree are reused as much as possible. From tokens at the tail (after the end of the modified region) the pos attribute is updated if necessary. The new ``tree`` is intended to replace a part of, or the whole old tree. If ``start`` == 0 and ``lexicons`` is not None; the whole tree can be replaced. (In this case; the root lexicon might have changed!) Use :meth:`replace_tree` to insert the result tree in the old tree. If ``start`` > 0, tokens in the old tree before start are to be preserved. If ``lexicons`` is None, old tail tokens after ``end`` must be reused, and the old list of open lexicons is still relevant. The ``offset`` then gives the position change for the tokens that are reused. """ from parce.tree import Context, make_tokens # local is faster if root_lexicon is not False: start, removed, added = 0, 0, len(text) else: root_lexicon = self.root.lexicon # manage end, and record if there is text after the modified part (tail) end = start + removed # record the position change for tail tokens that may be reused offset = added - removed if not root_lexicon: return BuildResult(Context(root_lexicon, None), start, start + added, 0, []) # If there remains text after the modified part, # we try to reuse the old tokens tail = False if start + added < len(text): # find the first token after the modified part tail_token = self.root.find_token_after(end) if tail_token: tail_gen = ((t, t.pos) for t in tail_token.forward_including() if not t.group and not (t.is_first() and t.parent.lexicon.consume)) for tail_token, tail_pos in tail_gen: tail = True break lowest_start = start changes = self.changes tree = None while True: # when restarting, see if we can reuse (part of) the new tree if tree: result = get_prepared_lexer(tree, text, start, True) if result: lexer, events, tokens = result t = tokens[0] context = t.parent for p, i in ancestors_with_index(t): del p[i+1:] del context[-1] else: tree = None # find insertion spot in old tree if not tree: start = min(lowest_start, start) result = get_prepared_lexer(self.root, text, start) if result: lexer, events, tokens = result t = tokens[0] context, tree = new_tree(t) start = tokens[-1].end lowest_start = min(lowest_start, start) else: tree = context = Context(root_lexicon, None) lexer = Lexer([root_lexicon]) events = lexer.events(text) lowest_start = 0 peek = self.peek_threshold + lowest_start if self.peek_threshold else 0 # start parsing for target, lexemes in events: if target: for _ in range(target.pop, 0): context = context.parent for lexicon in target.push: context = Context(lexicon, context) context.parent.append(context) tokens = make_tokens(lexemes, context) if tail: # handle tail pos = tokens[0].pos - offset if pos > tail_pos: for tail_token, tail_pos in tail_gen: if tail_pos >= pos: break else: tail = False if pos == tail_pos and tokens[0].equals(tail_token) and \ (context or not context.lexicon.consume) : # we can reuse the tail from tail_pos return BuildResult(tree, lowest_start, tail_pos, offset, None) context.extend(tokens) if changes: # handle changes c = self.get_changes() if c: # break out and adjust the current tokenizing process text = c.text start = c.start if c.root_lexicon != False: root_lexicon = c.root_lexicon if not root_lexicon: return BuildResult(Context(root_lexicon, None), 0, len(text), 0, []) start = 0 tail = False tree = None elif tail: # reuse old tail? new_tail_pos = start + c.added if new_tail_pos >= len(text): tail = False else: offset += c.added - c.removed new_tail_pos -= offset if new_tail_pos > tail_pos: for tail_token, tail_pos in tail_gen: if tail_pos >= new_tail_pos: break else: tail = False break # break the for loop to restart at new start pos if peek and tokens[-1].pos > peek: # call peek with a copy of the current tree. copied_tree = tree.copy() # remove the first empty context t = copied_tree[0] while t and t.is_context: t = t[0] while t.is_context and not t: t = t.parent del t[0] self.peek(lowest_start, copied_tree) peek = 0 else: # we ran till the end, also return the open lexicons return BuildResult(tree, lowest_start, len(text), 0, lexer.lexicons[1:]) raise RuntimeError("shouldn't come here")
[docs] def replace_tree(self, result): """Modify the tree using the result from :meth:`build_new_tree`. In most types of GUI applications, this method should be called in the main (GUI) thread. The changes are delegated to the various ``replace_`` methods, which can be reimplemented to get fine-grained monitoring of and control over the tree-replacing process. Additionally, this method calls :meth:`invalidate_context` with the youngest Context that had children removed or added. """ tree, start, end, offset, lexicons = result if not tree.lexicon or tree.lexicon != self.root.lexicon: # whole tree update root = self.root for n in tree: n.parent = root self.replace_nodes(self.root, slice(None), tree) self.replace_root_lexicon(tree.lexicon) self.invalidate_context(self.root) else: context = self.root start_trail = self.root.find_token_left_with_trail(start)[1] if start else [] end_trail = self.root.find_token_with_trail(end)[1] if lexicons is None else [] # find the context that really changes, adjust trails i = 0 for i, (s, e) in enumerate(zip(start_trail, end_trail)): if s == e and len(tree) == 1 and tree[0].is_context: context = context[s] tree = tree[0] else: break del start_trail[:i], end_trail[:i] for n in tree: n.parent = context slice_end = None if end_trail: # join stuff after end_trail with tree c = context t = tree end_trail[-1] -= 1 for i in end_trail: l = len(t) - 1 if slice_end is None: slice_end = i + 1 else: s = c[i+1:] t.extend(s) for n in s: n.parent = t if offset: for n in util.tokens(s): n.pos += offset if t: t = t[l] # t can be empty if len(end_trail) == 1, is last iteration anyway c = c[i] if offset: # if there remain nodes in the current context after tree # insertion, store the index now, later we'll adjust the pos if slice_end is not None: replace_pos_index = slice_end - len(context) else: replace_pos_index = 0 if start_trail: # replace stuff after start_trail with tree c = context t = tree for i in start_trail[:-1]: for n in t[1:]: n.parent = c self.replace_nodes(c, slice(i + 1, slice_end), t[1:]) slice_end = None t = t[0] c = c[i] i = start_trail[-1] for n in t: n.parent = c self.replace_nodes(c, slice(i + 1, slice_end), t) self.invalidate_context(c) else: self.replace_nodes(context, slice(slice_end), tree) self.invalidate_context(context) if offset: if replace_pos_index < 0: self.replace_pos(context, len(context) + replace_pos_index, offset) for p, i in context.ancestors_with_index(): self.replace_pos(p, i + 1, offset) return ReplaceResult(start, end + offset, lexicons)
[docs] def replace_nodes(self, context, slice_, nodes): """Replace the context's slice with new nodes. This method is called by :meth:`replace_tree`. You can reimplement this method to notify others of the change. """ context[slice_] = nodes
[docs] def replace_root_lexicon(self, lexicon): """Set the root lexicon. This method is called by :meth:`replace_tree`. You can reimplement this method to notify others of the change. """ self.root.lexicon = lexicon
[docs] def replace_pos(self, context, index, offset): """Adjust the pos attribute of all tokens in ``context[index:]``. This method is called by :meth:`replace_tree`. You can reimplement this method to notify others of the change. """ for t in util.tokens(context[index:]): t.pos += offset
[docs] def invalidate_context(self, context): """Called with the younghest Context that had children are removed or added. This means that the meaning of this context probably has changed, for example when you want to transform the context to some other data structure, and that the ancestors also need to be invalidated. The default implementation of this method emits the ``invalidate`` event, see :meth:`~parce.util.Observable.connect`. """ self.emit("invalidate", context)
[docs] def get_changes(self): """Get and combine the stored change requests in a Changes object. This may only be called from the same thread that also performs the :meth:`rebuild`. """ c = Changes() while self.changes: c.add(*self.changes.pop(0)) return c
[docs] def start_processing(self): """Called when there are recorded changes to process. The default implementation read all build stages from the :meth:`process` generator until exhausted. You can inherit from this method to call it e.g. in a background thread. """ for stage in self.process(): pass
[docs] def process(self): """Process all changes and update the tree. This method behaves as a generator coroutine, instead of simply calling this method, you should iterate over its output, which reports which stage the process is at. Yields "build" when about to build a new tree; "replace" when about to replace a new tree; (which can be repeated); "finish" when finished looping, and "done" at the very end. When re-implementing :meth:`start_processing`, you can choose to decide which stages are to be run in a background thread and which in a main (GUI) thread. You should exhaust the generator fully. """ self.process_started() start = end = -1 lexicons = False # no change self._lock.acquire() c = self.get_changes() while c and c.has_changes(): self._lock.release() yield "build" result = self.build_new_tree(c.text, c.root_lexicon, c.start, c.removed, c.added) yield "replace" self.emit("replace") r = self.replace_tree(result) start = r.start if start == -1 else min (start, r.start) end = r.end if end == -1 else max(c.new_position(end), r.end) if r.lexicons is not None: lexicons = r.lexicons self._lock.acquire() c = self.get_changes() if start != -1: self.start = start if end != -1: self.end = end if lexicons is not False: self.lexicons = lexicons self.busy = False self._lock.release() yield "done" self.process_finished()
[docs] def peek(self, start, tree): """This is called from :meth:`build_new_tree` with a sneak preview tree. This can be used to get a small tree before the new tree is built completely, which is useful to update e.g. highlighting of a small portion of a document that is edited by a user, instead of waiting on the whole tree to update (which may cause slow highlighting updates). When build_new_tree (the build stage) is called from a background thread, this method will also be called from that same thread. Enable the ``peek()`` feature by setting the ``peek_threshold`` attribute to a value > 0. E.g. the value 1000 will cause the :meth:`peek` method to be called with a tree that encompasses at least 1000 characters (starting with the start position). The tree that is given, is a copy of the current tree. It is safe to use it in another thread, although its contents are not valid anymore when the build has finished, or when a build is restarted, causing peek() to be called a second time. (A build is restarted when there are new changes close to the position the build originally started.) The default implementation of this method emits the ``peek`` event, see :meth:`~parce.util.Observable.connect`. """ self.emit("peek", start, tree)
[docs] def process_started(self): """Called at the start ot the tree building process. The default implementation of this method emits the ``started`` event, see :meth:`~parce.util.Observable.connect`. """ self.emit("started")
[docs] def process_finished(self): """Called when tree building is done. The default implementation of this method emits the ``updated(start, end)`` and ``finished`` events, see :meth:`~parce.util.Observable.connect`. """ self.emit("updated", self.start, self.end) self.emit("finished")