"""Download and process ORCID in bulk."""
from __future__ import annotations
import csv
import gzip
import json
import logging
import tarfile
import typing
from collections import Counter
from collections.abc import Iterable
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple
from urllib.parse import parse_qs, unquote, urlparse
import bioregistry
import pystow
from lxml import etree
from pydantic import BaseModel, Field
from pydantic_extra_types.country import CountryAlpha2, _index_by_alpha2
from semantic_pydantic import SemanticField
from tqdm.auto import tqdm
from orcid_downloader.name_utils import clean_name
from orcid_downloader.standardize import standardize_role
if TYPE_CHECKING:
import gilda
__all__ = [
"Record",
"ensure_summaries",
"get_records",
"ground_researcher",
"ground_researcher_unambiguous",
"iter_records",
]
logger = logging.getLogger(__name__)
class VersionInfo(NamedTuple):
"""A tuple containing information for downloading ORCID data dumps."""
version: str
url: str
fname: str
size: int
#: See https://orcid.figshare.com/articles/dataset/ORCID_Public_Data_File_2023/24204912/1
#: and download the "summaries" file. Skip all the activities files
VERSION_2023 = VersionInfo(
version="2023",
url="https://orcid.figshare.com/ndownloader/files/42479943",
fname="ORCID_2023_10_summaries.tar.gz",
size=18_600_000,
)
NAMESPACES = {
"personal-details": "http://www.orcid.org/ns/personal-details",
"common": "http://www.orcid.org/ns/common",
"other-name": "http://www.orcid.org/ns/other-name",
"employment": "http://www.orcid.org/ns/employment",
"activities": "http://www.orcid.org/ns/activities",
"education": "http://www.orcid.org/ns/education",
"external-identifier": "http://www.orcid.org/ns/external-identifier",
"researcher-url": "http://www.orcid.org/ns/researcher-url",
"email": "http://www.orcid.org/ns/email",
"keyword": "http://www.orcid.org/ns/keyword",
"membership": "http://www.orcid.org/ns/membership",
"address": "http://www.orcid.org/ns/address",
"preferences": "http://www.orcid.org/ns/preferences",
}
MODULE_RAW = pystow.module("orcid", VERSION_2023.version)
MODULE = MODULE_RAW.module("output")
RECORDS_PATH = MODULE.join(name="records.jsonl.gz")
RECORDS_HQ_PATH = MODULE.join(name="records_hq.jsonl.gz")
SCHEMA_PATH = MODULE.join(name="schema.json")
URL_NAMES_PATH = MODULE.join(name="url_names.tsv")
EMAIL_PATH = MODULE.join(name="email.tsv")
PUBMEDS_PATH = MODULE.join(name="pubmeds.tsv.gz")
xrefs_folder = MODULE.module("xrefs")
GITHUBS_PATH = xrefs_folder.join(name="github.tsv")
XREFS_SUMMARY_PATH = xrefs_folder.join(name="README.md")
SSSOM_PATH = xrefs_folder.join(name="sssom.tsv.gz")
ROLES = MODULE.module("roles")
AFFILIATION_XREFS_SUMMARY_PATH = ROLES.join(name="affiliation_xref_summary.tsv")
EDUCATION_ROLE_SUMMARY_PATH = ROLES.join(name="education_role_summary.tsv.gz")
EDUCATION_ROLE_UNSTANDARDIZED_SUMMARY_PATH = ROLES.join(
name="education_role_unstandardized_summary.tsv"
)
EMPLOYMENT_ROLE_SUMMARY_PATH = ROLES.join(name="employment_role_summary.tsv.gz")
AFFILIATION_NO_ROR_PATH = ROLES.join(name="affiliation_missing_ror.tsv")
def _norm_key(id_type):
return id_type.lower().replace(" ", "").rstrip(":")
EXTERNAL_ID_SKIP = {
"iAuthor": "Reuses orcid",
"中国科学家在线": "iAuthor, but site is dead",
"JRIN": "Reuses orcid",
"ORCID": "redundant",
"ORCID id": "redundant",
"eScientist": "Reuses orcid",
"UNE Researcher ID": "not an id",
"UOW Scholars": "not an id",
"US EPA VIVO": "not an id",
"Chalmers ID": "not an id",
"HKUST Profile": "not an id",
"Custom": "garb",
"Profile system identifier": "garb",
"CTI Vitae": "dead website",
"Pitt ID": "dead website",
"VIVO Cornell": "dead website",
"Technical University of Denmark CWIS": "dead website",
"HKU ResearcherPage": "dead website",
"Digital Author ID": "DAI is not specific service",
"Digital Author ID (DAI)": "DAI is not specific service",
"dai": "DAI is not specific service",
}
EXTERNAL_ID_SKIP = {_norm_key(k): v for k, v in EXTERNAL_ID_SKIP.items()}
#: Mapping from ORCID keys to Bioregistry prefixes for external IDs
EXTERNAL_ID_MAPPING = {
"ResearcherID": "wos.researcher",
"RID": "wos.researcher",
"Web of Science Researcher ID": "wos.researcher",
"other-id - Web of Science": "wos.researcher",
"Scopus Author ID": "scopus",
"Scopus ID": "scopus",
"ID de autor de Scopus": "scopus",
"???person.personsources.scopusauthor???": "scopus",
"Loop profile": "loop",
"github": "github",
"ISNI": "isni",
"Google Scholar": "google.scholar",
"gnd": "gnd",
"Authenticus": "authenticus",
"AuthenticusID": "authenticus",
"AuthID": "authenticus",
"ID Dialnet": "dialnet.author",
"Dialnet ID": "dialnet.author",
"SciProfiles": "sciprofiles",
"Sciprofile": "sciprofiles",
"Ciência ID": "cienciavitae",
"KAKEN": "kaken",
"Researcher Name Resolver ID": "kaken",
"SSRN": "ssrn.author",
"socialscienceresearchnetwork": "ssrn.author",
"ssrnauthorpage": "ssrn.author",
"ssrnpage": "ssrn.author",
}
for key, value in EXTERNAL_ID_MAPPING.items():
_resource = bioregistry.get_resource(value)
if _resource is None:
raise ValueError(f"Unregistered prefix in EXTERNAL_ID_MAPPING for {key} - {value}")
if _resource.prefix != value:
raise ValueError(
f"Mapping uses non-standard prefix for {key} - {value} should be {_resource.prefix}"
)
EXTERNAL_ID_MAPPING = {_norm_key(k): v for k, v in EXTERNAL_ID_MAPPING.items()}
UNMAPPED_EXTERNAL_ID: set[str] = set()
PERSONAL_KEYS = {
"website",
"homepage",
"blog",
"personalpage",
"personalhomepage",
"personalwebsite",
"personalwebsites",
"personalweb-page",
"personalwebpage",
"webpage",
"personal",
"professionalwebsite",
"personalsite",
"personalblog",
"mywebsite",
"mysite",
"officialweb-page",
"sitiowebpersonal",
"paginaweb",
"personelwebsite",
"blogpessoal",
"mypersonalsite",
"mypersonalblog",
"personalweb-site",
"web-site",
"professionalblog",
"personalwebsiteandblog",
"myweb",
"homewebsite",
"personalweb",
"mypersonalwebsite",
"blogpersonal",
}
[docs]
def ensure_summaries() -> Path:
"""Ensure the ORCID summaries file (32+ GB) is downloaded."""
return MODULE_RAW.ensure(url=VERSION_2023.url, name=VERSION_2023.fname)
class Work(BaseModel):
"""A model representing a creative work."""
pubmed: str = Field(..., title="PubMed identifier")
class Date(BaseModel):
"""A model representing a date."""
year: int
month: int | None = None
day: int | None = None
class Affiliation(BaseModel):
"""A model representing an affiliation (either education or employment)."""
name: str
start: Date | None = Field(None, title="Start Year")
end: Date | None = Field(None, title="End Year")
role: str | None = None
xrefs: dict[str, str] = Field(default_factory=dict, title="Database Cross-references")
# xrefs includes ror, ringgold, grid, funderregistry, lei
# LEI see https://www.gleif.org/en/lei-data/gleif-concatenated-file/download-the-concatenated-file
@property
def ror(self) -> str | None:
"""Get the affiliation's ROR identifier, if available."""
return self.xrefs.get("ror")
[docs]
class Record(BaseModel):
"""A model representing a person."""
orcid: str = SemanticField(..., prefix="orcid")
name: str
homepage: str | None = Field(None)
locale: str | None = Field(None)
countries: list[CountryAlpha2] = Field(
default_factory=list, description="The ISO 3166-1 alpha-2 country codes (uppercase)"
)
aliases: list[str] = Field(default_factory=list)
xrefs: dict[str, str] = Field(default_factory=dict, title="Database Cross-references")
works: list[Work] = Field(default_factory=list)
employments: list[Affiliation] = Field(default_factory=list)
educations: list[Affiliation] = Field(default_factory=list)
memberships: list[Affiliation] = Field(default_factory=list)
emails: list[str] = Field(default_factory=list)
keywords: list[str] = Field(default_factory=list)
commons_image: str | None = None
@property
def commons_image_url(self) -> str | None:
"""Get the Wikimedia Commons image URL, if available."""
if self.commons_image:
return f"http://commons.wikimedia.org/wiki/Special:FilePath/{self.commons_image}"
return None
[docs]
def is_high_quality(self) -> bool:
"""Return if the record is high quality."""
# just see if there's literally anything in there
return bool(
any("ror" in employment.xrefs for employment in self.employments)
or any("ror" in education.xrefs for education in self.educations)
# or any("ror" in membership.xrefs for membership in self.memberships)
or self.works
or self.xrefs
)
@property
def email(self) -> str | None:
"""Get the first email, if available."""
return self.emails[0] if self.emails else None
@property
def country(self) -> str | None:
"""Get the first country, if available."""
return self.countries[0] if self.countries else None
@property
def github(self) -> str | None:
"""Get the researcher's GitHub username, if available."""
return self.xrefs.get("github")
@property
def linkedin(self) -> str | None:
"""Get the researcher's LinkedIn username, if available."""
return self.xrefs.get("linkedin")
@property
def loop(self) -> str | None:
"""Get the researcher's Loop identifier, if available."""
return self.xrefs.get("loop")
@property
def wos(self) -> str | None:
"""Get the researcher's Web of Science identifier, if available."""
return self.xrefs.get("wos.researcher")
@property
def dblp(self) -> str | None:
"""Get the researcher's DBLP identifier, if available."""
return self.xrefs.get("dblp")
@property
def scopus(self) -> str | None:
"""Get the researcher's Scopus identifier, if available."""
return self.xrefs.get("scopus")
@property
def google(self) -> str | None:
"""Get the researcher's Google Scholar identifier, if available."""
return self.xrefs.get("google.scholar")
@property
def wikidata(self) -> str | None:
"""Get the researcher's Wikidata identifier, if available."""
return self.xrefs.get("wikidata")
@property
def mastodon(self) -> str | None:
"""Get the researcher's Mastodon handle, if available."""
return self.xrefs.get("mastodon")
@property
def current_affiliation_ror(self) -> str | None:
"""Guess the current affiliation and return its ROR identifier, if available."""
# assume that if there are employments listed that are not over yet,
# then these surpass education
for employment in self.employments:
if employment.ror and employment.end is None:
return employment.ror
for education in self.educations:
if education.ror and education.end is None:
return education.ror
return None
def _iter_tarfile_members(path: Path):
tar_file = tarfile.open(path)
while member := tar_file.next():
if not member.name.endswith(".xml"):
continue
yield tar_file.extractfile(member)
tar_file.close()
[docs]
def iter_records(
*, force: bool = False, records_path: Path | None = None, desc: str = "Loading ORCID"
) -> Iterable[Record]:
"""Parse ORCID summary XML files, takes about an hour."""
if records_path is None:
records_path = RECORDS_PATH
if not force and records_path.is_file():
tqdm.write(f"reading cached records from {records_path}")
with gzip.open(records_path, "rt") as file:
for line in tqdm(
file, unit_scale=True, unit="line", desc=desc, total=VERSION_2023.size
):
yield Record.model_validate_json(line)
else:
from orcid_downloader.ror import get_ror_grounder
from orcid_downloader.wikidata import get_orcid_to_commons_image, get_orcid_to_wikidata
ror_grounder = get_ror_grounder()
orcid_to_wikidata = get_orcid_to_wikidata()
orcid_to_wikimedia_commons = get_orcid_to_commons_image()
f = partial(
_process_file,
ror_grounder=ror_grounder,
orcid_to_wikidata=orcid_to_wikidata,
orcid_to_wikimedia_commons=orcid_to_wikimedia_commons,
)
path = ensure_summaries()
it = _iter_tarfile_members(path)
# TODO use process_map with chunksize=50_000
with (
gzip.open(records_path, "wt") as records_file,
gzip.open(RECORDS_HQ_PATH, "wt") as records_hq_file,
):
for file in tqdm(it, unit_scale=True, unit="record", total=VERSION_2023.size):
record: Record | None = f(file)
if record is None:
continue
line = record.model_dump_json(exclude_defaults=True, indent=None) + "\n"
records_file.write(line)
if record.is_high_quality():
records_hq_file.write(line)
yield record
with URL_NAMES_PATH.open("w") as file:
writer = csv.writer(file, delimiter="\t")
writer.writerow(("norm_name", "name", "count", "example"))
writer.writerows(
(norm_name, UNKNOWN_NAMES_FULL[norm_name], count, UNKNOWN_NAMES_EXAMPLES[norm_name])
for norm_name, count in UNKNOWN_NAMES.most_common()
)
[docs]
def get_records(*, force: bool = False) -> dict[str, Record]:
"""Parse ORCID summary XML files, takes about an hour."""
return {record.orcid: record for record in iter_records(force=force)}
def _process_file( # noqa:C901
file,
ror_grounder: gilda.Grounder,
orcid_to_wikidata: dict[str, str],
orcid_to_wikimedia_commons: dict[str, str],
) -> Record | None:
"""Process a file obnect for an XML file.
:param file: An XML file object
:param ror_grounder: A grounder object for ROR
:param orcid_to_wikidata: A one-to-one mapping from ORCID to Wikidata identifiers
:param orcid_to_wikimedia_commons: A mapping from ORCID to Wikimedia Commons image tags
:return: A record
.. code-block:: python
grounder = get_ror_grounder()
with open("../../example.xml") as file:
print(
_process_file(file, grounder).model_dump_json(
indent=2,
exclude_none=True,
exclude_unset=True,
exclude_defaults=True,
)
)
"""
tree = etree.parse(file) # noqa:S320
orcid = tree.findtext(".//common:path", namespaces=NAMESPACES)
if not orcid:
return None
family_name = tree.findtext(".//personal-details:family-name", namespaces=NAMESPACES)
given_names = tree.findtext(".//personal-details:given-names", namespaces=NAMESPACES)
if family_name and given_names:
label_name = f"{given_names.strip()} {family_name.strip()}"
else:
label_name = None
credit_name = tree.findtext(".//personal-details:credit-name", namespaces=NAMESPACES)
if credit_name:
credit_name = credit_name.strip()
if not credit_name and not label_name:
# Skip records that don't have any kinds of labels
return None
aliases: set[str] = set()
if not credit_name:
name = label_name
else:
name = credit_name
if label_name is not None:
aliases.add(label_name)
name = name and clean_name(name)
aliases.update(_iter_other_names(tree))
if name in aliases: # make sure there's no duplicate
aliases.remove(name)
name, aliases = _reconcile_aliass(name, aliases)
record: dict[str, Any] = {"orcid": orcid, "name": name}
if aliases:
record["aliases"] = sorted(aliases)
employments = _get_employments(tree, grounder=ror_grounder)
if employments:
record["employments"] = employments
educations = _get_educations(tree, grounder=ror_grounder)
if educations:
record["educations"] = educations
memberships = _get_memberships(tree, grounder=ror_grounder)
if memberships:
record["memberships"] = memberships
ids, homepage = _get_external_identifiers(tree, orcid=orcid)
if wikidata_id := orcid_to_wikidata.get(orcid):
ids["wikidata"] = wikidata_id
if ids:
record["xrefs"] = ids
if homepage:
record["homepage"] = homepage
if image := orcid_to_wikimedia_commons.get(orcid):
record["commons_image"] = image
if works := _get_works(tree, orcid=orcid):
record["works"] = works
if emails := _get_emails(tree):
record["emails"] = emails
if keywords := _get_keywords(tree):
record["keywords"] = sorted(keywords)
if countries := _get_countries(tree, orcid=orcid):
record["countries"] = countries
if locale := _get_locale(tree, orcid=orcid):
record["locale"] = locale
return Record.parse_obj(record)
def _reconcile_aliass(name: str | None, aliases: set[str]) -> tuple[str | None, set[str]]:
# TODO if there is a comma in the main name picked, try and find an alias with no commas
return name, aliases
def _iter_other_names(t) -> Iterable[str]:
for part in t.findall(".//other-name:content", namespaces=NAMESPACES):
part = part.text.strip()
for z in part.split(";"):
z = z.strip()
if z is not None and " " in z and len(z) < 60:
yield clean_name(z.strip())
UNKNOWN_SOURCES = {}
LOWERCASE_THESE_SOURCES = {"RINGGOLD", "GRID", "LEI"}
UNKNOWN_NAMES: typing.Counter[str] = Counter()
UNKNOWN_NAMES_EXAMPLES: dict[str, str] = {}
UNKNOWN_NAMES_FULL: dict[str, str] = {}
def _get_external_identifiers(tree, orcid) -> tuple[dict[str, str], str | None]: # noqa:C901
rv = {}
homepage = None
for element in tree.findall(
".//external-identifier:external-identifiers/external-identifier:external-identifier",
namespaces=NAMESPACES,
):
local_unique_identifier = element.findtext(
".//common:external-id-value", namespaces=NAMESPACES
)
if not local_unique_identifier:
continue
id_type = element.findtext(".//common:external-id-type", namespaces=NAMESPACES)
id_type_norm = _norm_key(id_type)
if id_type_norm in EXTERNAL_ID_SKIP:
continue
prefix = EXTERNAL_ID_MAPPING.get(id_type_norm)
if not prefix:
if id_type not in UNMAPPED_EXTERNAL_ID:
UNMAPPED_EXTERNAL_ID.add(id_type)
id_url = element.findtext(".//common:external-id-url", namespaces=NAMESPACES)
tqdm.write(
f"[{orcid}] unknown id '{id_type}' w/ val "
f"'{local_unique_identifier}' at {id_url}"
)
continue
if prefix == "wikidata" and not local_unique_identifier.startswith("Q"):
continue
rv[prefix] = local_unique_identifier
for element in tree.findall(
".//researcher-url:researcher-urls/researcher-url:researcher-url", namespaces=NAMESPACES
):
name = element.findtext(".//researcher-url:url-name", namespaces=NAMESPACES)
url = element.findtext(".//researcher-url:url", namespaces=NAMESPACES).rstrip("/")
if name and homepage is None and _norm_key(name) in PERSONAL_KEYS:
homepage = url
continue
url = url.removeprefix("https://")
url = url.removeprefix("Https://")
url = url.removeprefix("http://")
if url.startswith("github.com/"):
identifier = url.removeprefix("github.com/")
identifier = identifier.split("?")[0] # remove trash like ?tab=repositories
if "/" not in identifier: # i.e., this is not a specific repo
rv["github"] = identifier
elif url.startswith("www.github.com/"):
identifier = url.removeprefix("www.github.com/")
if "/" not in identifier: # i.e., this is not a specific repo
rv["github"] = identifier
elif url.startswith("twitter.com/") or url.startswith("x.com/"):
pass # skip twitter, it's not reasonable to participate on this platform anymore
elif "facebook" in url or "instagram" in url:
continue # skip social media
elif url.startswith("www.wikidata.org/wiki/"):
identifier = url.removeprefix("www.wikidata.org/wiki/")
rv["wikidata"] = identifier
elif url.startswith("tools.wmflabs.org/scholia/author/"):
identifier = url.removeprefix("tools.wmflabs.org/scholia/author/")
rv["wikidata"] = identifier
elif "linkedin.com/in/" in url: # multiple languages subdomains, so startswith doesn't work
identifier = url.rstrip("/").split("linkedin.com/in/")[1]
rv["linkedin"] = unquote(identifier)
elif "scholar.google" in url:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
identifier = query_params.get("user", [None])[0]
if identifier:
rv["google.scholar"] = identifier
elif url.startswith("publons.com/author/"):
identifier = url.removeprefix("publons.com/author/").split("/")[0]
rv["publons.researcher"] = identifier
elif url.startswith("www.researchgate.net/profile/"):
identifier = url.removeprefix("www.researchgate.net/profile/")
rv["researchgate.profile"] = identifier
elif url.startswith("www.scopus.com/authid/detail.uri?authorId="):
identifier = url.removeprefix("www.scopus.com/authid/detail.uri?authorId=")
rv["scopus"] = identifier
elif url.startswith("www.webofscience.com/wos/author/record/"):
identifier = url.removeprefix("www.webofscience.com/wos/author/record/")
rv["wos.researcher"] = identifier
elif url.startswith("lattes.cnpq.br/"):
rv["lattes"] = url.removeprefix("lattes.cnpq.br/")
elif url.startswith("dialnet.unirioja.es/servlet/autor?codigo="):
rv["dialnet.author"] = url.removeprefix("dialnet.unirioja.es/servlet/autor?codigo=")
elif url.startswith("papers.ssrn.com/sol3/cf_dev/AbsByAuth.cfm?per_id="):
rv["ssrn.author"] = url.removeprefix(
"papers.ssrn.com/sol3/cf_dev/AbsByAuth.cfm?per_id="
)
elif url.startswith("osf.io/"):
rv["osf"] = url.removeprefix("osf.io/")
elif url.startswith("viaf.org/viaf/"):
rv["viaf"] = url.removeprefix("viaf.org/viaf/")
elif url.startswith("ieeexplore.ieee.org/author/"):
rv["ieee.author"] = url.removeprefix("ieeexplore.ieee.org/author/")
elif url.startswith("loop.frontiersin.org/people/"):
loop_identifier = (
url.removeprefix("loop.frontiersin.org/people/")
.removesuffix("/overview")
.removesuffix("/bio")
)
rv["loop"] = loop_identifier
elif url.startswith("dblp.org/pid/"):
rv["dblp.author"] = url.removeprefix("dblp.org/pid/").removesuffix(".html")
elif url.startswith("dblp.uni-trier.de/pid/"):
rv["dblp.author"] = url.removeprefix("dblp.uni-trier.de/pid/").removesuffix(".html")
elif url.startswith("hub.docker.com/u/"):
rv["dockerhub.user"] = url.removeprefix("hub.docker.com/u/")
elif name:
if name.lower() == "mastodon":
try:
host, username = url.rstrip("/").rsplit("/", 1)
except ValueError:
tqdm.write(f"[{orcid}] malformed mastodon URL: {url}")
else:
host = host.removesuffix("/web")
host = host.removesuffix("/media")
rv["mastodon"] = f"{username}@{host}"
else:
norm_name = _norm_key(name)
UNKNOWN_NAMES[norm_name] += 1
UNKNOWN_NAMES_FULL[norm_name] = name
UNKNOWN_NAMES_EXAMPLES[norm_name] = url
# else, no name, nothing to do here. maybe add some logging?
return rv, homepage
def _get_emails(tree) -> list[str]:
return [
email.text.strip()
for email in tree.findall(".//email:emails/email:email/email:email", namespaces=NAMESPACES)
]
def _get_keywords(tree) -> Iterable[str]:
return [
keyword.text.strip()
for keyword in tree.findall(
".//keyword:keywords/keyword:keyword/keyword:content", namespaces=NAMESPACES
)
if keyword.text
]
def _get_countries(tree, orcid) -> list[str]:
rv = []
for country in tree.findall(
".//address:addresses/address:address/address:country", namespaces=NAMESPACES
):
value = country.text
if not value:
continue
value = value.strip().upper()
if value == "XK":
# XK is a proposed code for Kosovo, but isn't valid.
# Only an issue for a few dozen records
continue
elif value not in _index_by_alpha2():
tqdm.write(f"[{orcid}] invalid 2 letter country code: {value}")
continue
rv.append(value)
return rv
def _get_locale(tree, orcid) -> str | None:
value = tree.findtext(".//preferences:preferences/preferences:locale", namespaces=NAMESPACES)
if value is None:
return None
return value.strip()
def _get_works(tree, orcid) -> list[dict[str, str]]:
# get a subset of all works with pubmed IDs. TODO extend to other IDs
pmids = set()
for g in tree.findall(
".//activities:works/activities:group/common:external-ids", namespaces=NAMESPACES
):
if g.findtext(".//common:external-id-type", namespaces=NAMESPACES) == "pmid":
value: str | None = g.findtext(".//common:external-id-value", namespaces=NAMESPACES)
if not value:
continue
value_std = _standardize_pubmed(value)
if not value_std:
continue
if not value_std.isnumeric():
tqdm.write(f"[{orcid}] unstandardized PubMed: '{value}'")
continue
pmids.add(value_std)
return [{"pubmed": pmid} for pmid in sorted(pmids)]
PUBMED_PREFIXES = [
"http://www.ncbi.nlm.nih.gov/pubmed/",
"https://www.ncbi.nlm.nih.gov/pubmed/",
"https://www-ncbi-nlm-nih-gov.proxy.bib.ucl.ac.be:2443/pubmed/",
"http://europepmc.org/abstract/med/",
"https://pubmed.ncbi.nlm.nih.gov/",
"www.ncbi.nlm.nih.gov/pubmed/",
"PMID: ",
"PMID:",
"PubMed PMID: ",
"MEDLINE:",
"[PMID: ",
"PubMed:",
"PubMed ID: ",
"ncbi.nlm.nih.gov/pubmed/",
"PMid:",
"PubMed ",
"PMID",
"PMID ",
]
def _standardize_pubmed(pubmed: str) -> str | None:
"""Standardize a pubmed field.
:param pubmed: A string that might somehow represent a pubmed identifier
:returns: A cleaned pubmed identifier, if possible
2023 statistics:
- correct: 3,175,196 (99.85%)
- needs processing: 2,832 (0.09%)
- junk: 2,080 (0.07%)
what was in here? a mashup of:
- DOIs
- PMC identifiers,
- a few stray strings that contain a combination of pubmed, PMC,
- a lot with random text (keywords)
- some with full text citations
"""
pubmed = pubmed.strip().strip(".").rstrip("/").strip()
if pubmed.isnumeric():
return pubmed
for x in PUBMED_PREFIXES:
if pubmed.startswith(x):
parts = pubmed.removeprefix(x).strip().split()
if parts:
return parts[0]
if pubmed.endswith("E7"):
pubmed = str(int(float(pubmed)))
return pubmed
return None
def _get_employments(tree, grounder: gilda.Grounder):
elements = tree.findall(".//employment:employment-summary", namespaces=NAMESPACES)
return _get_affiliations(elements, grounder)
def _get_educations(tree, grounder: gilda.Grounder):
elements = tree.findall(
".//activities:educations//education:education-summary", namespaces=NAMESPACES
)
return _get_affiliations(elements, grounder)
def _get_memberships(tree, grounder: gilda.Grounder):
elements = tree.findall(
".//activities:memberships//membership:membership-summary", namespaces=NAMESPACES
)
return _get_affiliations(elements, grounder)
def _get_affiliations(elements, grounder: gilda.Grounder):
results = []
for element in elements:
if element is None:
continue
organization_element = element.find(".//common:organization", namespaces=NAMESPACES)
if organization_element is None:
continue
name = organization_element.findtext(".//common:name", namespaces=NAMESPACES)
if not name:
continue
references = _get_disambiguated_organization(organization_element, name, grounder)
record = {"name": name.strip(), "xrefs": references}
if (start_date := element.find(".//common:start-date", namespaces=NAMESPACES)) is not None:
record["start"] = _get_date(start_date)
if (end_date := element.find(".//common:end-date", namespaces=NAMESPACES)) is not None:
record["end"] = _get_date(end_date)
if role := _get_role(element):
record["role"] = role
results.append(record)
return results
def _get_date(date_element) -> Date | None:
year = date_element.findtext(".//common:year", namespaces=NAMESPACES)
if year is None:
return None
month = date_element.findtext(".//common:month", namespaces=NAMESPACES)
day = date_element.findtext(".//common:day", namespaces=NAMESPACES)
return Date(year=year, month=month, day=day)
def _get_disambiguated_organization(organization_element, name, grounder) -> dict[str, str]:
references = {}
for de in organization_element.findall(
".//common:disambiguated-organization", namespaces=NAMESPACES
):
source = de.findtext(".//common:disambiguation-source", namespaces=NAMESPACES)
link = de.findtext(".//common:disambiguated-organization-identifier", namespaces=NAMESPACES)
if not link:
continue
link = link.strip()
if source == "ROR":
references["ror"] = link.removeprefix("https://ror.org/")
elif source in LOWERCASE_THESE_SOURCES:
references[source.lower()] = link
elif source == "FUNDREF":
references["funderregistry"] = link.removeprefix("http://dx.doi.org/10.13039/")
elif source not in UNKNOWN_SOURCES:
tqdm.write(f"unhandled source: {source} / link: {link}")
UNKNOWN_SOURCES[source] = link
if "ror" not in references and (scored_match := grounder.ground_best(name)):
references["ror"] = scored_match.term.id
return references
#: Role text needs to be longer than this
MINIMUM_ROLE_LENGTH = 4
def _get_role(element) -> str | None:
role = element.findtext(".//common:role-title", namespaces=NAMESPACES)
if not role:
return None
role, _ = standardize_role(role)
if len(role) < MINIMUM_ROLE_LENGTH:
return None
return role
[docs]
def ground_researcher(name: str) -> list[gilda.ScoredMatch]:
"""Ground a name based on ORCID names/aliases."""
from .lexical import get_orcid_grounder
return get_orcid_grounder().ground(name)
[docs]
def ground_researcher_unambiguous(name: str) -> str | None:
"""Ground a name based on ORCID names/aliases."""
matches = ground_researcher(name)
if len(matches) != 1:
return None
return matches[0].term.id
def write_schema() -> None:
"""Write the JSON schema."""
schema = Record.model_json_schema()
SCHEMA_PATH.write_text(json.dumps(schema, indent=2))
def write_summaries(*, force: bool = False): # noqa:C901
"""Write summary files."""
from tabulate import tabulate
# count affiliations (breakdown by employer, education, combine)
# count roles
# count records with email
has_email = 0
has_github = 0
xrefs_counter: Counter[str] = Counter()
affiliation_xrefs_counter: Counter[str] = Counter()
education_roles: Counter[str] = Counter()
unstandardized_education_roles: Counter[str] = Counter()
unstandardized_education_roles_example: dict[str, str] = {}
employment_roles: Counter[str] = Counter()
affiliation_no_ror: Counter[str] = Counter()
affiliation_no_ror_example: dict[str, str] = {}
with (
open(GITHUBS_PATH, "w") as githubs_file,
open(EMAIL_PATH, "w") as emails_file,
gzip.open(PUBMEDS_PATH, "wt") as pubmeds_file,
gzip.open(SSSOM_PATH, "wt") as sssom_file,
):
emails_writer = csv.writer(emails_file, delimiter="\t")
emails_writer.writerow(("orcid", "email"))
githubs_writer = csv.writer(githubs_file, delimiter="\t")
githubs_writer.writerow(("orcid", "github"))
pubmeds_writer = csv.writer(pubmeds_file, delimiter="\t")
pubmeds_writer.writerow(("orcid", "pubmed"))
# TODO write out bioregistry prefixes in sssom_file
sssom_writer = csv.writer(sssom_file, delimiter="\t")
sssom_writer.writerow(
("subject_id", "subject_label", "predicate_id", "object_id", "mapping_justification")
)
for record in iter_records(force=force, desc="Writing summaries"):
if record.emails:
has_email += 1
for email in record.emails:
emails_writer.writerow((record.orcid, email))
sssom_writer.writerows(
(
f"orcid:{record.orcid}",
record.name,
"skos:exactMatch",
f"{k}:{v}",
"semapv:ManualMappingCuration",
)
for k, v in sorted(record.xrefs.items())
)
if github := record.xrefs.get("github"):
githubs_writer.writerow((record.orcid, github))
has_github += 1
for k in record.xrefs:
xrefs_counter[k] += 1
for education in record.educations:
if education.role:
role_std, did_std = standardize_role(education.role)
education_roles[role_std] += 1
if not did_std:
unstandardized_education_roles[education.role] += 1
if education.role not in unstandardized_education_roles_example:
unstandardized_education_roles_example[education.role] = record.orcid
for k in education.xrefs:
affiliation_xrefs_counter[k] += 1
if "ror" not in education.xrefs: # and not grounder.ground(education.name):
affiliation_no_ror[education.name] += 1
if education.name not in affiliation_no_ror_example:
affiliation_no_ror_example[education.name] = record.orcid
for employment in record.employments:
if employment.role:
employment_roles[employment.role] += 1
for k in employment.xrefs:
affiliation_xrefs_counter[k] += 1
if "ror" not in employment.xrefs: # and not grounder.ground(education.name):
affiliation_no_ror[employment.name] += 1
if employment.name not in affiliation_no_ror_example:
affiliation_no_ror_example[employment.name] = record.orcid
for membership in record.memberships:
# TODO role standardization?
if "ror" not in employment.xrefs: # and not grounder.ground(education.name):
affiliation_no_ror[membership.name] += 1
if membership.name not in affiliation_no_ror_example:
affiliation_no_ror_example[membership.name] = record.orcid
for work in record.works:
pubmed = _standardize_pubmed(work.pubmed)
if pubmed:
pubmeds_writer.writerow((record.orcid, pubmed))
XREFS_SUMMARY_PATH.write_text(
f"""\
# Cross References Summary
{tabulate(xrefs_counter.most_common(), tablefmt='github', headers=['prefix', 'count'])}
""".rstrip()
)
with AFFILIATION_XREFS_SUMMARY_PATH.open("w") as file:
write_counter(file, ("prefix", "count"), affiliation_xrefs_counter)
with gzip.open(EDUCATION_ROLE_SUMMARY_PATH, "wt") as file:
write_counter(file, ("role", "count"), education_roles)
with open(EDUCATION_ROLE_UNSTANDARDIZED_SUMMARY_PATH, "w") as file:
write_counter(
file,
("role", "count"),
unstandardized_education_roles,
examples=unstandardized_education_roles_example,
)
with open(AFFILIATION_NO_ROR_PATH, "w") as file:
write_counter(
file, ("name", "count"), affiliation_no_ror, examples=affiliation_no_ror_example
)
with gzip.open(EMPLOYMENT_ROLE_SUMMARY_PATH, "wt") as file:
write_counter(file, ("role", "count"), employment_roles)
def write_counter(file, header, counter, examples=None) -> None:
"""Write a counter to a TSV file."""
writer = csv.writer(file, delimiter="\t")
if examples is not None:
writer.writerow((*header, "example"))
writer.writerows((k, count, examples.get(k)) for k, count in counter.most_common())
else:
writer.writerow(header)
writer.writerows(counter.most_common())
def _process_example() -> Record | None:
import gilda
from orcid_downloader.wikidata import get_orcid_to_commons_image, get_orcid_to_wikidata
here = Path(__file__).parent.parent.parent.resolve()
example_path = here.joinpath("example.xml")
grounder = gilda.Grounder([])
orcid_to_wikimedia_commons = get_orcid_to_commons_image()
orcid_to_wikidata = get_orcid_to_wikidata()
with example_path.open() as file:
res = _process_file(file, grounder, orcid_to_wikidata, orcid_to_wikimedia_commons)
return res
def _main():
from .lexical import write_gilda, write_lexical
from .owl import write_owl_rdf
from .sqldb import write_sqlite
write_schema()
write_summaries(force=True)
tqdm.write("Writing SQLite")
write_sqlite()
tqdm.write("Writing OWL")
write_owl_rdf()
tqdm.write("Generating Gilda TSV (~30 min)")
write_gilda()
tqdm.write("Generating Gilda SQLite (~30 min)")
write_lexical()
print(*ground_researcher("CT Hoyt"), sep="\n") # noqa:T201
if __name__ == "__main__":
_main()