# -*- coding: utf-8 -*-
#
# This file is part of the parce Python package.
#
# Copyright © 2019-2020 by Wilbert Berendsen <info@wilbertberendsen.nl>
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Document and Cursor form the basis of handling of documents in the parce
package.
A Document contains a text string that is mutable via item and slice methods.
If you make modifications while inside a context (using the Python context
manager protocol), the modifications are only applied when the context
exits for the last time.
For tokenized documents (see :class:`parce.Document`), parce inherits from this
base class (see the :mod:`~parce.work` module).
You can use a Cursor to keep track of positions in a document. The position
(and selection) of a Cursor is adjusted when the text in the document is
changed.
You can use the various ``find_block()`` and ``blocks()`` methods to iterate
over a Document on a line-by-line basis.
"""
import contextlib
import itertools
import re
import reprlib
import weakref
from . import mutablestring
from . import util
[docs]class AbstractDocument(mutablestring.AbstractMutableString):
"""Base class for a Document.
A Document is like a mutable string, but understands :class:`Cursor` and
:class:`Block`.
"""
url = None #: can be set to the url this document is loaded from
encoding = None #: can be set to the encoding used to read/write this document
modified = False #: Whether this document is modified
block_separator = '\n' #: separator to use for block boundaries (newline)
def __init__(self, text="", url=None, encoding=None):
mutablestring.AbstractMutableString.__init__(self)
self._cursors = weakref.WeakSet()
self._revision = 0
if url:
self.url = url
if encoding:
self.encoding = encoding
def _parse_key(self, key):
"""Get start and end values from key. Called by __[gs]etitem__."""
if isinstance(key, AbstractTextRange):
key = slice(key.pos, key.end)
return super()._parse_key(key)
def _update_cursors(self, changes):
"""Update the positions of the cursors."""
i = 0
cursors = sorted(self._cursors, key = lambda c: c.pos)
for start, end, text in changes:
for c in cursors[i:]:
ahead = c.pos > start
if ahead:
if end >= c.pos:
c.pos = start
else:
c.pos += start + len(text) - end
if c.end is not None and c.end >= start:
if end >= c.end:
c.end = start + len(text)
else:
c.end += start + len(text) - end
elif not ahead:
i += 1 # don't consider this cursor any more
def _update_text(self, changes):
"""Apply the changes to the text, reimplemented here to also update the Cursor positions."""
self._update_cursors(changes)
self._revision += 1
self.modified = True
[docs] def revision(self):
"""Return the revision number.
This number is incremented by one on every document change.
"""
return self._revision
[docs] def find_start_of_block(self, position):
"""Find the start of the block the position is in."""
sep = self.block_separator
pos = self.text().rfind(sep, 0, position)
return 0 if pos == -1 else pos + len(sep)
[docs] def find_end_of_block(self, position):
"""Find the end of the block the position is in."""
pos = self.text().find(self.block_separator, position)
return len(self) if pos == -1 else pos
[docs] def find_block(self, position):
"""Return a :class:`Block` representing the text line (block) at
position.
A position larger than the document's length just returns the last
block. (A document has always at least one block).
"""
pos = self.find_start_of_block(position)
end = self.find_end_of_block(pos)
return Block(self, pos, end)
[docs] def find_block_by_number(self, number):
"""Return the :class:`Block` for text line ``number``.
The first block has number 0. Returns None when the document has less
blocks than the specified number. Negative numbers count backwards from
the end.
Avoid this method and :meth:`block_count` where you can, they are
potentially expensive for large documents. Prefer :meth:`find_block`
and :meth:`Block.next_block` or :meth:`Block.previous_block` for
iteration.
"""
text = self.text()
sep = self.block_separator
l = len(sep)
if number >= 0:
end = -l
for n in range(number + 1):
if end == len(text):
return
pos = end + l
end = text.find(sep, pos)
if end == -1:
end = len(text)
block = Block(self, pos, end)
block._block_number = n
else:
pos = len(text) + l
for n in range(number, 0):
if pos == 0:
return
end = pos - l
pos = text.rfind(sep, 0, end)
pos = 0 if pos == -1 else pos + l
block = Block(self, pos, end)
return block
[docs] def block_count(self):
"""Return the number of blocks (lines) in this document.
This counts the number of occurrences of :attr:`block_separator` in the
full text, incremented with 1. A document has always at least one
block.
Avoid this method and :meth:`find_block_by_number` where you can, they
are potentially expensive for large documents. Prefer
:meth:`find_block` and :meth:`Block.next_block` or
:meth:`Block.previous_block` for iteration.
"""
return self.text().count(self.block_separator) + 1
[docs] def blocks(self, start=0, end=None):
"""Yield Blocks, starting at position start, ending at end.
Start defaults to 0, end to None, which means iterate to the last block.
"""
block = self.find_block(start)
if end is None:
while block:
yield block
block = block.next_block()
elif block:
while True:
yield block
block = block.next_block()
if not block or block.pos >= end:
break
[docs] def replace(self, old, new, start=0, end=None, count=0):
"""Replace occurrences of old with new in region start->end.
If count > 0, specifies the maximum number of occurrences to be
replaced.
"""
if old == new:
return
text = self[start:end]
length = len(old)
with self:
pos = text.find(old)
while pos >= 0:
self[start+pos:start+pos+length] = new
pos = text.find(old, pos + length)
count -= 1
if count == 0:
break
[docs] def re_sub(self, pattern, replacement, start=0, end=None, count=0, re_flags=0):
"""Replace regular expression matches of pattern with replacement.
The pattern may be a string or a compiled regexp pattern object.
Backreferences are allowed. The region can be set with start and end.
If count > 0, specifies the maximum number of occurrences to be
replaced.
The replacement argument can also be a funtion, which is then called
with the match object and should return the replacement string.
With start and end the range can be specified, and, if the pattern was
a string it is compiled to a regular expression object using the
speficied re_flags.
"""
if isinstance(pattern, str):
pattern = re.compile(pattern, re_flags)
if not callable(replacement):
replacement = (lambda repl: lambda m: m.expand(repl))(replacement)
text = self[start:end]
with self:
for i, m in enumerate(pattern.finditer(text), 1):
self[start+m.start():start+m.end()] = replacement(m)
if i == count:
break
[docs] def trim(self, start=0, end=None):
"""Remove trialing whitespace in the specified region."""
self.re_sub(r'[ \t]+$', '', start, end, re_flags=re.MULTILINE)
[docs] def translate(self, mapping, start=0, end=None, count=0, whole_words=False):
"""Replace every occurrence of a key in mapping with its value.
If whole_words is True, only match the keys at word boundaries.
"""
from . import regex
expr = regex.words2regexp(mapping.keys())
if whole_words:
expr = r"\b({})\b".format(expr)
repl = lambda m: mapping[m.group()]
self.re_sub(expr, repl, start, end, count)
[docs] def text_changed(self, position, removed, added):
"""Called after ``_update_text()``.
The default implementation does nothing.
"""
pass
[docs]class Document(AbstractDocument, mutablestring.MutableString, util.Observable):
"""A basic Document with undo and modified status.
This Document implements :class:`AbstractDocument` by holding the text in a
hidden _text attribute. It adds support for undo/redo and has a
:meth:`modified` state.
It also inherits from :class:`~parce.util.Observable` and emits the
following events:
``"text_change" (position, removed, added)``:
emitted with ``position``, ``removed``, ``added`` arguments whenever the
text changes
``"text_changed"``:
emitted directly afther the previous event, but without arguments
``"modification_changed" (bool)``:
emitted when the :meth:`modified` state changes; True means the document
was modified
``"undo_available" (bool)``:
emitted when the availability of :meth:`undo` changes
``"redo_available" (bool)``:
emitted when the availability of :meth:`redo` changes.
"""
_in_undo = util.Switch()
_in_redo = util.Switch()
undo_redo_enabled = True
def __init__(self, text="", url=None, encoding=None):
AbstractDocument.__init__(self, text, url, encoding)
mutablestring.MutableString.__init__(self, text)
util.Observable.__init__(self)
self._modified = False
self._undo_stack = []
self._redo_stack = []
@property
def modified(self):
"""Read or set whether the text is modified, happens automatically normally."""
return self._modified
@modified.setter
def modified(self, modified):
if not (self._in_undo or self._in_redo):
changed = modified != self._modified
self._modified = modified
if not modified and not (self._in_undo or self._in_redo):
self._set_all_undo_redo_modified()
if changed:
self.emit("modification_changed", modified)
def _update_text(self, changes):
"""Apply the changes to the text."""
with self._check_undo_state():
if self.undo_redo_enabled:
self._store_undo(self._reverse_changes(changes))
AbstractDocument._update_text(self, changes)
mutablestring.MutableString._update_text(self, changes)
def _reverse_changes(self, changes):
"""Return the changes that would be needed to undo the given list of changes."""
def reverse_changes():
head = 0
current_text = self.text()
for start, end, text in changes:
head += start
yield (head, head + len(text), current_text[start:end])
head += len(text) - end
return list(reverse_changes())
def _store_undo(self, changes):
"""Store changes needed to reconstruct the previous state."""
state = [changes, self.modified]
if self._in_undo:
self._redo_stack.append(state)
else:
self._undo_stack.append(state)
if not self._in_redo:
self._redo_stack.clear()
def _apply_undo_redo(self, switch, stack):
"""Apply changes from the specified stack (undo or redo).
If the return value is not None, it is the new modified state.
"""
if self._edit_context > 0:
raise RuntimeError("can't undo or redo while in edit context")
if stack:
changes, modified = stack.pop()
with switch, self:
for start, end, text in changes:
self[start:end] = text
self.modified = modified
@contextlib.contextmanager
def _check_undo_state(self):
"""Context manager to perform operations that alter the undo / redo stack.
Emits "undo_available" and "redo_available" when they change.
"""
can_undo = self.can_undo()
can_redo = self.can_redo()
try:
yield
finally:
new_can_undo = self.can_undo()
new_can_redo = self.can_redo()
if new_can_undo != can_undo:
self.emit("undo_available", new_can_undo)
if new_can_redo != can_redo:
self.emit("redo_available", new_can_redo)
def _set_all_undo_redo_modified(self):
"""Called on set_modified(False). Set all undo/redo state to modified."""
for undo in itertools.chain(self._undo_stack, self._redo_stack):
undo[1] = True
[docs] def undo(self):
"""Undo the last modification."""
self._apply_undo_redo(self._in_undo, self._undo_stack)
[docs] def redo(self):
"""Redo the last undone modification."""
self._apply_undo_redo(self._in_redo, self._redo_stack)
[docs] def clear_undo_redo(self):
"""Clear the undo/redo stack."""
with self._check_undo_state():
self._undo_stack.clear()
self._redo_stack.clear()
[docs] def can_undo(self):
"""Return True if undo is possible."""
return bool(self._undo_stack)
[docs] def can_redo(self):
"""Return True if redo is possible."""
return bool(self._redo_stack)
[docs] def text_changed(self, position, removed, added):
"""Called after ``_update_text()`` has been called.
The default implementation emits the ``"text_change"`` and
``"text_changed"`` events.
"""
self.emit("text_change", position, removed, added)
self.emit("text_changed")
[docs]class Cursor(AbstractTextRange):
"""Describes a certain range (selection) in a :class:`Document`.
You may change the ``pos`` and ``end`` attributes yourself. Both must be an
integer, end may also be None, denoting the end of the document.
As long as you keep a reference to the Cursor, its positions are updated
when the document changes. When text is inserted at ``pos``, the position
remains the same. But when text is inserted at the end of a cursor, the
``end`` position (if not None) moves along with the new text. E.g.::
>>> from parce.document import Document, Cursor
>>> d = Document('hi there, folks!')
>>> c = Cursor(d, 8, 8)
>>> with d:
... d[8:8] = 'new text'
...
>>> c.pos, c.end
(8, 16)
You can also use a Cursor as key while editing a document::
>>> c = Cursor(d, 8, 8)
>>> with d:
... d[c] = 'new text'
You cannot alter the document via the Cursor. All move and select methods
return the cursor again, so they can be chained::
>>> c = Cursor(d).select_all()
>>> c.pos, c.end
(0, None)
"""
__slots__ = ("__weakref__",)
def __init__(self, document, pos=0, end=-1):
"""Init with document. ``pos`` defaults to 0 and ``end`` defaults to pos."""
super().__init__(document, pos, end if end != -1 else pos)
document._cursors.add(self)
[docs] def block(self):
"""Return the :class:`Block` our ``pos`` is in."""
return self.document().find_block(self.pos)
[docs] def blocks(self):
"""Yield the Blocks from pos to end."""
yield from self.document().blocks(self.pos, self.end)
[docs] def move_start_of_block(self):
"""Move pos and end to the start of the current block. Returns self."""
self.pos = self.end = self.document().find_start_of_block(self.pos)
return self
[docs] def move_end_of_block(self):
"""Move pos and end to the end of the current block. Returns self."""
self.pos = self.end = self.document().find_end_of_block(self.pos)
return self
[docs] def select(self, pos, end=-1):
"""Change pos and end in one go. End defaults to pos. Returns self."""
self.pos = pos
self.end = pos if end == -1 else end
return self
[docs] def select_all(self):
"""Set pos to 0 and end to None; selecting all text. Returns self."""
self.pos = 0
self.end = None
return self
[docs] def select_none(self):
"""Set end to pos. Returns self."""
self.end = self.pos
return self
[docs] def selection(self):
"""Return the two-tuple (pos, end) denoting the selected range.
The ``end`` value is never None, it is set to the length of the
document if the :attr:`~.document.AbstractTextRange.end` attribute is None.
"""
end = len(self.document()) if self.end is None else self.end
return self.pos, end
[docs] def has_selection(self):
"""Return True if text is selected."""
pos, end = self.selection()
return pos < end
[docs] def select_start_of_block(self):
"""Moves the selection pos to the beginning of the current line.
Returns self.
"""
self.pos = self.document().find_start_of_block(self.pos)
return self
[docs] def select_end_of_block(self):
"""Moves the selection end (if not None) to the end of its line.
Returns self.
"""
if self.end is not None:
self.end = self.document().find_end_of_block(self.end)
return self
[docs] def lstrip(self, chars=None):
"""Move pos to the right, if specified characters can be skipped.
By default whitespace is skipped, like Python's lstrip() string method.
Returns self.
"""
text = self.text()
if text:
offset = len(text) - len(text.lstrip(chars))
self.pos += offset
return self
[docs] def rstrip(self, chars=None):
"""Move end to the left, if specified characters can be skipped.
By default whitespace is skipped, like Python's rstrip() string method.
Returns self.
"""
text = self.text()
if text:
offset = len(text) - len(text.rstrip(chars))
if offset:
doc_length = len(self.document())
if self.end is None or self.end > doc_length:
self.end = doc_length
self.end -= offset
return self
[docs] def strip(self, chars=None):
"""Adjust pos and end, like Python's strip() method. Returns self."""
self.rstrip(chars)
self.lstrip(chars)
return self
[docs]class Block(AbstractTextRange):
r"""Represents a single line (block) of text in the :class:`Document`.
Block objects are separated by newlines in the Document, and are created
by Document.find_block() or Cursor.block(), and the blocks() iterator of
both Cursor and Document.
Unlike :class:`Cursor`, Block objects do not update their position when the
document is changed. You should use Blocks while iterating but throw them
away after applying changes to a Document.
Blocks can be compared: blocks originating from the same document compare
equal when they point to the same position. You can also use the ``<``,
``<=``, ``>`` and ``>=`` operators.
"""
__slots__ = ('_block_number',)
def __init__(self, document, pos, end):
super().__init__(document, pos, end)
if pos == 0:
self._block_number = 0
def __len__(self):
return self.end - self.pos
[docs] def is_first(self):
"""True if this is the first block."""
return self.pos == 0
[docs] def is_last(self):
"""True if this is the last block."""
return self.end >= len(self.document())
@property
def block_number(self):
"""The number of this block in the document.
The first block has number 0.
"""
try:
n = self._block_number
except AttributeError:
d = self.document()
n = self._block_number = d[:self.pos].count(d.block_separator)
return n
[docs] def next_block(self):
"""The next block if available."""
if not self.is_last():
pos = self.end + len(self.document().block_separator)
end = self.document().find_end_of_block(pos)
block = type(self)(self.document(), pos, end)
try:
block._block_number = self._block_number + 1
except AttributeError:
pass
return block
[docs] def previous_block(self):
"""The previous block if available."""
if self.pos > 0:
end = self.pos - len(self.document().block_separator)
pos = self.document().find_start_of_block(end)
block = type(self)(self.document(), pos, end)
try:
block._block_number = self._block_number - 1
except AttributeError:
pass
return block
[docs] def tokens(self):
"""Convenience method returning a tuple with all Tokens that are in
or overlap this block.
The Document must have the :class:`~.work.WorkerDocumentMixin` class
mixed in (i.e. have the :meth:`~.work.WorkerDocumentMixin.get_root`
method.
"""
return tuple(super().tokens())