Source code for ucca.textutil

"""Utility functions for UCCA package."""
import os
import sys
import time
from collections import OrderedDict
from collections import deque
from contextlib import contextmanager
from enum import Enum
from itertools import groupby, islice
from operator import attrgetter, itemgetter

import numpy as np
from tqdm import tqdm

from ucca import layer0, layer1

MODEL_ENV_VAR = "SPACY_MODEL"  # Determines the default spaCy model to load
DEFAULT_MODEL = {"en": "en_core_web_md", "fr": "fr_core_news_md", "de": "de_core_news_md", "ru": "ru"}

N_THREADS = 4
BATCH_SIZE = 50


[docs]class Attr(Enum): """Wrapper for spaCy Attr, determining order for saving in layer0.extra per token when as_array=True""" ORTH = 0 LEMMA = 1 TAG = 2 POS = 3 ENT_TYPE = 4 ENT_IOB = 5 DEP = 6 HEAD = 7 SHAPE = 8 PREFIX = 9 SUFFIX = 10
[docs] def __call__(self, value, vocab=None, as_array=False, lang=None): """Resolve numeric ID of attribute value to string (if as_array=False) or to int (if as_array=True)""" if value is None: return None if self in (Attr.ENT_IOB, Attr.HEAD): return int(np.int64(value)) if as_array: is_str = isinstance(value, str) if is_str or self in (Attr.ORTH, Attr.LEMMA): try: # Will find the value even if it's a new string, but that's OK since the hash is deterministic i = get_vocab(vocab, lang).strings[value] if is_str: # Replace with numeric ID since as_array=True value = i except KeyError: value = None return value if value is None or isinstance(value, str) else int(value) try: return get_vocab(vocab, lang)[value].text except KeyError: return None
@property def key(self): """String used in `extra' dict of Terminals to store this attribute when as_array=False""" return self.name.lower()
[docs]def get_nlp(lang="en"): """ Load spaCy model for a given language, determined by `models' dict or by MODEL_ENV_VAR """ instance = nlp.get(lang) if instance is None: model = models.get(lang) if not model: models[lang] = model = os.environ.get("_".join((MODEL_ENV_VAR, lang.upper()))) or \ os.environ.get(MODEL_ENV_VAR) or DEFAULT_MODEL.get(lang, "xx") started = time.time() with external_write_mode(): print("Loading spaCy model '%s'... " % model, end="", flush=True) nlp[lang] = instance = load_spacy_model(model) with external_write_mode(): print("Done (%.3fs)." % (time.time() - started)) tokenizer[lang] = instance.tokenizer import spacy instance.tokenizer = lambda words: spacy.tokens.Doc(instance.vocab, words=words) return instance
[docs]def load_spacy_model(model): if model == "ru": try: from spacy.lang.ru import Russian return Russian() except OSError as e: raise OSError("Failed to get spaCy Russian model. Install it using " "pip install git+https://github.com/aatimofeev/spacy_russian_tokenizer.git") from e import spacy try: return spacy.load(model) except OSError: spacy.cli.download(model) # Workaround from https://github.com/explosion/spaCy/issues/3435#issuecomment-474580269 from spacy.cli import link from spacy.util import get_package_path link(model, model, force=True, model_path=get_package_path(model)) try: return spacy.load(model) except OSError as e: raise OSError("Failed to get spaCy model. Download it manually using " "`python -m spacy download %s`." % model) from e
models = {} # maps language two-letter code to name of spaCy model nlp = {} # maps language two-letter code to actual loaded spaCy model tokenizer = {} # maps language two-letter code to tokenizer of spaCy model
[docs]def get_tokenizer(tokenized=False, lang="en"): instance = get_nlp(lang) return instance.tokenizer if tokenized else tokenizer[lang]
[docs]def get_vocab(vocab=None, lang=None): if vocab is not None: return vocab return (get_nlp(lang) if lang else get_nlp()).vocab
[docs]def get_word_vectors(dim=None, size=None, filename=None, vocab=None): """ Get word vectors from spaCy model or from text file :param dim: dimension to trim vectors to (default: keep original) :param size: maximum number of vectors to load (default: all) :param filename: text file to load vectors from (default: from spaCy model) :param vocab: instead of strings, look up keys of returned dict in vocab (use lang str, e.g. "en", for spaCy vocab) :return: tuple of (dict of word [string or integer] -> vector [NumPy array], dimension) """ orig_keys = vocab is None if isinstance(vocab, str) or not filename: vocab = get_nlp(vocab if isinstance(vocab, str) else "en").vocab def _lookup(word): try: return word.orth_ if orig_keys else word.orth except AttributeError: if orig_keys: return word lex = vocab[word] return getattr(lex, "orth", lex) if filename: it = read_word_vectors(dim, size, filename) nr_row, nr_dim = next(it) vectors = OrderedDict(islice(tqdm(((_lookup(w), v) for w, v in it if orig_keys or w in vocab), desc="Loading '%s'" % filename, postfix=dict(dim=nr_dim), file=sys.stdout, total=nr_row, unit=" vectors"), nr_row)) else: # return spaCy vectors nr_row, nr_dim = vocab.vectors.shape if dim is not None and dim < nr_dim: nr_dim = int(dim) vocab.vectors.resize(shape=(int(size or nr_row), nr_dim)) lexemes = sorted([l for l in vocab if l.has_vector], key=attrgetter("prob"), reverse=True)[:size] vectors = OrderedDict((_lookup(l), l.vector) for l in lexemes) return vectors, nr_dim
[docs]def read_word_vectors(dim, size, filename): """ Read word vectors from text file, with an optional first row indicating size and dimension :param dim: dimension to trim vectors to :param size: maximum number of vectors to load :param filename: text file to load vectors from :return: generator: first element is (#vectors, #dims); and all the rest are (word [string], vector [NumPy array]) """ try: first_line = True nr_row = nr_dim = None with open(filename, encoding="utf-8") as f: for line in f: fields = line.split() if first_line: first_line = False try: nr_row, nr_dim = map(int, fields) is_header = True except ValueError: nr_dim = len(fields) - 1 # No header, just get vector length from first one is_header = False if dim and dim < nr_dim: nr_dim = dim yield size or nr_row, nr_dim if is_header: continue # Read next line word, *vector = fields if len(vector) >= nr_dim: # May not be equal if word is whitespace yield word, np.asarray(vector[-nr_dim:], dtype="f") except OSError as e: raise IOError("Failed loading word vectors from '%s'" % filename) from e
[docs]def annotate(passage, *args, **kwargs): """ Run spaCy pipeline on the given passage, unless already annotated :param passage: Passage object, whose layer 0 nodes will be added entries in the `extra' dict """ list(annotate_all([passage], *args, **kwargs))
[docs]def annotate_as_tuples(passages, replace=False, as_array=False, as_extra=True, lang="en", vocab=None, verbose=False): for passage_lang, passages_by_lang in groupby(passages, get_lang): for need_annotation, stream in groupby(to_annotate(passages_by_lang, replace, as_array, as_extra), lambda x: bool(x[0])): annotated = get_nlp(passage_lang or lang).pipe( stream, as_tuples=True, n_threads=N_THREADS, batch_size=BATCH_SIZE) if need_annotation else stream annotated = set_docs(annotated, as_array, as_extra, passage_lang or lang, vocab, replace, verbose) for passage, passages in groupby(annotated, itemgetter(0)): yield deque(passages, maxlen=1).pop() # Wait until all paragraphs have been annotated
[docs]def annotate_all(passages, replace=False, as_array=False, as_extra=True, as_tuples=False, lang="en", vocab=None, verbose=False): """ Run spaCy pipeline on the given passages, unless already annotated :param passages: iterable of Passage objects, whose layer 0 nodes will be added entries in the `extra' dict :param replace: even if a given passage is already annotated, replace with new annotation :param as_array: instead of adding `extra' entries to each terminal, set layer 0 extra["doc"] to array of ids :param as_extra: set `extra' entries to each terminal :param as_tuples: treat input as tuples of (passage text, context), and return context for each passage as-is :param lang: optional two-letter language code, will be overridden if passage has "lang" attrib :param vocab: optional dictionary of vocabulary IDs to string values, to avoid loading spaCy model :param verbose: whether to print annotated text :return: generator of annotated passages, which are actually modified in-place (same objects as input) """ if not as_tuples: passages = ((p,) for p in passages) for t in annotate_as_tuples(passages, replace=replace, as_array=as_array, as_extra=as_extra, lang=lang, vocab=vocab, verbose=verbose): yield t if as_tuples else t[0]
[docs]def get_lang(passage_context): return passage_context[0].attrib.get("lang")
[docs]def to_annotate(passage_contexts, replace, as_array=False, as_extra=True): """Filter passages to get only those that require annotation; split to paragraphs and return generator of (list of tokens, (paragraph index, list of Terminals, Passage) + original context appended) tuples""" return (([t.text for t in terminals] if replace or not is_annotated(passage, as_array, as_extra) else (), (i, terminals, passage) + tuple(context)) for passage, *context in passage_contexts for i, terminals in enumerate(break2paragraphs(passage, return_terminals=True)))
[docs]def is_annotated(passage, as_array=False, as_extra=True): """Whether the passage is already annotated or only partially annotated""" l0 = passage.layer(layer0.LAYER_ID) docs = l0.extra.get("doc") if as_array: if not (not l0.all or docs is not None and len(docs) == max(t.paragraph for t in l0.all) and sum(map(len, docs)) == len(l0.all) and all(i is None or isinstance(i, int) for l in docs for t in l for i in t)): return False if as_extra: if not all(a.key in t.extra for t in l0.all for a in Attr): return False return True
[docs]def set_docs(annotated, as_array, as_extra, lang, vocab, replace, verbose): """Given spaCy annotations, set values in layer0.extra per paragraph if as_array=True, and in Terminal.extra if as_extra=True""" for doc, (i, terminals, passage, *context) in annotated: if doc: # Not empty, so copy values from spacy import attrs arr = doc.to_array([getattr(attrs, a.name) for a in Attr]) if as_array: docs = passage.layer(layer0.LAYER_ID).docs(i + 1) existing = docs[i] + (len(arr) - len(docs[i])) * [len(Attr) * [None]] docs[i] = [[a(v if e is None or replace else e, get_vocab(vocab, lang), as_array=True) for a, v, e in zip(Attr, values, es)] for values, es in zip(arr, existing)] if as_extra: for terminal, values in zip(terminals, arr): for attr, value in zip(Attr, values): if replace or not terminal.extra.get(attr.key): terminal.extra[attr.key] = attr(value, get_vocab(vocab, lang)) if verbose: data = [[a.key for a in Attr]] + \ [[str(a(t.tok[a.value], get_vocab(vocab, lang)) if as_array else t.extra[a.key]) for a in Attr] for j, t in enumerate(terminals)] width = [max(len(f) for f in t) for t in data] for j in range(len(Attr)): try: print(" ".join("%-*s" % (w, f[j]) for f, w in zip(data, width))) except UnicodeEncodeError: pass print() yield (passage,) + tuple(context)
SENTENCE_END_MARKS = ('.', '?', '!') QUOTES = ("'", '"', "`", "»", "«")
[docs]def break2sentences(passage, lang="en", *args, **kwargs): """ Breaks paragraphs into sentences according to the annotation. A sentence is a list of terminals which ends with a mark from SENTENCE_END_MARKS, and is also the end of a paragraph or parallel scene. :param passage: the Passage object to operate on :param lang: optional two-letter language code :return: a list of positions in the Passage, each denotes a closing Terminal of a sentence. """ del args, kwargs l1 = passage.layer(layer1.LAYER_ID) terminals = extract_terminals(passage) if not terminals: return [] if any(n.outgoing for n in l1.all): # Passage is labeled ps_ends = [ps.end_position for ps in l1.top_scenes] ps_starts = [ps.start_position for ps in l1.top_scenes] marks = [] for terminal in terminals: # Annotations doesn't always include the ending period (or other mark) # with the parallel scene it closes. Hence, if the terminal before the # mark closed the parallel scene, and this mark doesn't open a scene # in any way (hence it probably just "hangs" there), it's a sentence end if terminal.text in SENTENCE_END_MARKS and \ (terminal.position in ps_ends or (terminal.position - 1) in ps_ends and terminal.position not in ps_starts) or \ terminal.position - 1 in marks and layer0.is_punct(terminal) and not \ (terminal.text in QUOTES and terminal.text == terminals[marks[-1] - 1].text): marks.append(terminal.position) else: # Not labeled, split using spaCy annotated = get_nlp(lang=lang)([t.text for t in terminals]) marks = [span.end for span in annotated.sents] marks = sorted(set(marks + break2paragraphs(passage))) # Avoid punctuation-only sentences by picking the last punctuation symbol in each consecutive sequence if len(marks) > 1: marks = [x for x, y in zip(marks[:-1], marks[1:]) if not all(map(layer0.is_punct, terminals[x - 1:y - 1]))] + \ [marks[-1]] return marks
[docs]def extract_terminals(p): """returns an iterator of the terminals of the passage p""" return p.layer(layer0.LAYER_ID).all
[docs]def break2paragraphs(passage, return_terminals=False, *args, **kwargs): """ Breaks into paragraphs according to the annotation. Uses the `paragraph' attribute of layer 0 to find paragraphs. :param passage: the Passage object to operate on :param return_terminals: whether to return actual Terminal objects of all terminals rather than just end positions :return: a list of positions in the Passage, each denotes a closing Terminal of a paragraph. """ del args, kwargs terminals = sorted(extract_terminals(passage), key=attrgetter("position")) if not terminals: return [] return [list(p) for _, p in groupby(terminals, key=attrgetter("paragraph"))] if return_terminals else \ [t1.position for t1, t2 in zip(terminals[:-1], terminals[1:]) if t2.para_pos == 1 or t1.paragraph != t2.paragraph] + [terminals[-1].position]
[docs]def indent_xml(xml_as_string): """ Indents a string of XML-like objects. This works only for units with no text or tail members, and only for strings whose leaves are written as <tag /> and not <tag></tag>. :param xml_as_string: XML string to indent :return: indented XML string """ tabs = 0 lines = str(xml_as_string).replace('><', '>\n<').splitlines() s = '' for line in lines: if line.startswith('</'): tabs -= 1 s += (" " * tabs) + line + '\n' if not (line.endswith('/>') or line.startswith('</')): tabs += 1 return s
[docs]@contextmanager def external_write_mode(*args, **kwargs): try: with tqdm.external_write_mode(*args, **kwargs): yield except AttributeError: yield