#!/usr/bin/env python3
import argparse
import logging
import sys
from requests.exceptions import HTTPError
import json
from ucca.convert import to_json, to_text
from ucca.ioutil import get_passages_with_progress_bar
from uccaapp.api import ServerAccessor
try:
from simplejson.scanner import JSONDecodeError
except ImportError:
from json.decoder import JSONDecodeError
desc = """Convert a passage file to JSON format and upload to UCCA-App as a completed task"""
# https://github.com/omriabnd/UCCA-App/blob/master/UCCAApp_REST_API_Reference.pdf
# ucca-demo.cs.huji.ac.il or ucca.staging.cs.huji.ac.il
# upload the parse as a (completed) task:
# 0. decide which project and user you want to assign it to
# 1. POST passage (easy format)
# 2. POST task x (of type tokenization)
# 3. PUT task x (submit)
# 4. POST task y (of type annotation with parent x; this is the more complicated format)
# 5. PUT task y (submit)
[docs]class TaskUploader(ServerAccessor):
def __init__(self, user_id, source_id, project_id, **kwargs):
super().__init__(**kwargs)
self.set_source(source_id)
self.set_project(project_id)
self.set_user(user_id)
[docs] def upload_tasks(self, filenames, log=None, submit=True, existing_ids=None, **kwargs):
del kwargs
log_h = open(log, "w", encoding="utf-8") if log else None
if existing_ids:
with open(existing_ids, "r", encoding="utf-8") as ids_h:
ids = {old_passage_id: (passage_id, tok_id, ann_id)
for (old_passage_id, passage_id, tok_id, ann_id)
in map(str.split, ids_h)}
else:
ids = None
try:
for passage in get_passages_with_progress_bar(filenames, desc="Uploading"):
logging.debug("Uploading passage %s" % passage.ID)
task = self.upload_task(passage, log=log_h, submit=submit, ids=ids)
logging.debug("Submitted task %d" % task["id"])
yield task
except HTTPError as e:
try:
raise ValueError((e.response.json() if e.response else json.loads(e.args[0]))["detail"]) from e
except JSONDecodeError:
raise ValueError(e.response.text) from e
finally:
if log:
log_h.close()
[docs] def upload_task(self, passage, log=None, submit=True, ids=None, upload=True):
if ids:
passage_id, tok_id, ann_id = ids[passage.ID]
passage_out = self.get_passage(passage_id)
tok_user_task_out = tok_task_out = self.get_user_task(tok_id)
ann_user_task_in = self.get_user_task(ann_id)
else:
passage_out = self.create_passage(text=to_text(passage, sentences=False)[0], type="PUBLIC",
source=self.source, external_id=passage.ID) if upload else passage
task_in = dict(type="TOKENIZATION", status="ONGOING", project=self.project, user=self.user,
passage=passage_out, manager_comment=passage.ID, user_comment=passage.ID, parent=None,
is_demo=False, is_active=True)
tok_task_out = self.create_task(**task_in) if upload else task_in
tok_user_task_in = dict(tok_task_out)
tok_user_task_in.update(to_json(passage, return_dict=True, tok_task=True))
tok_user_task_out = self.submit_task(**tok_user_task_in) if upload else tok_user_task_in
task_in.update(parent=tok_task_out, type="ANNOTATION")
ann_user_task_in = self.create_task(**task_in) if upload else task_in
ann_user_task_in.update(
to_json(passage, return_dict=True, tok_task=tok_user_task_out, all_categories=self.layer["categories"]))
ann_user_task_out = self.submit_task(**ann_user_task_in, submit=submit) if upload else ann_user_task_in
if log:
print(passage.ID, passage_out["id"], tok_task_out["id"], ann_user_task_out["id"],
file=log, sep="\t", flush=True)
return ann_user_task_out
[docs] @staticmethod
def add_arguments(argparser):
argparser.add_argument("filenames", nargs="+", help="passage file names to convert and upload")
argparser.add_argument("-l", "--log", help="filename to write log of uploaded passages to")
argparser.add_argument("--no-submit", action="store_false", dest="submit", help="do not submit annotation task")
argparser.add_argument("--existing-ids", help="use existing task IDs from file (output of --log); no creation")
argparser.add_argument("-n", "--no-upload", action="store_false", dest="upload", help="do not upload anything")
ServerAccessor.add_project_id_argument(argparser)
ServerAccessor.add_source_id_argument(argparser)
ServerAccessor.add_user_id_argument(argparser)
ServerAccessor.add_arguments(argparser)
[docs]def main(**kwargs):
list(TaskUploader(**kwargs).upload_tasks(**kwargs))
if __name__ == "__main__":
argument_parser = argparse.ArgumentParser(description=desc)
TaskUploader.add_arguments(argument_parser)
main(**vars(argument_parser.parse_args()))
sys.exit(0)