"""Downloading and processing functions for PubMed."""
from __future__ import annotations
import datetime
import functools
import gzip
import itertools as itt
import logging
import multiprocessing as mp
import os
import sys
import typing
from collections.abc import Iterable
from itertools import chain
from pathlib import Path
from typing import Any, Literal, TextIO, TypeAlias
from xml.etree.ElementTree import Element
import click
import requests
import ssslm
from bs4 import BeautifulSoup
from curies import NamableReference, Reference, Triple
from curies import vocabulary as v
from curies.triples import read_triples, write_triples
from lxml import etree
from more_click import verbose_option
from pydantic import BaseModel, Field
from pystow.utils import safe_open_writer
from tqdm import tqdm
from tqdm.contrib import tmap
from tqdm.contrib.concurrent import process_map, thread_map
from tqdm.contrib.logging import logging_redirect_tqdm
from .utils import (
ISSN,
MODULE,
Author,
Collective,
Heading,
parse_author,
parse_date,
parse_mesh_heading,
)
__all__ = [
"AbstractText",
"Article",
"Journal",
"ensure_baselines",
"ensure_updates",
"iterate_ensure_articles",
"iterate_ensure_baselines",
"iterate_ensure_updates",
"iterate_process_articles",
"iterate_process_baselines",
"iterate_process_updates",
"process_articles",
"process_baselines",
"process_updates",
]
logger = logging.getLogger(__name__)
BASELINE_URL = "https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
UPDATES_URL = "https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"
BASELINE_MODULE = MODULE.module("baseline")
UPDATES_MODULE = MODULE.module("updates")
EDGES_PATH = MODULE.join(name="edges.tsv.gz")
SSSOM_PATH = MODULE.join(name="articles.sssom.tsv.gz")
TEST_PATH = MODULE.join(name="articles-test.sssom.tsv")
UPDATES_PATH = MODULE.join(name="updates.html")
BASELINE_PATH = MODULE.join(name="baseline.html")
def _download_baseline(url: str) -> Path:
return BASELINE_MODULE.ensure(url=url)
def _download_updates(url: str) -> Path:
return UPDATES_MODULE.ensure(url=url)
class JournalIssue(BaseModel):
"""Represents the issue of a journal in which the article was published."""
volume: str | None = None
issue: str | None = None
published: datetime.date | None = None
[docs]
class Journal(BaseModel):
"""Represents a reference to a journal.
Note, full information about a journal can be loaded elsewhere.
"""
issn: str | None = Field(
None, description="The ISSN used for linking, since there might be many"
)
nlm_catalog_id: str = Field(..., description="The NLM identifier for the journal")
issns: list[ISSN] = Field(default_factory=list)
[docs]
class AbstractText(BaseModel):
"""Represents an abstract text object."""
text: str
label: str | None = None
category: str | None = None
class History(BaseModel):
"""Represents a history item."""
status: Literal[
"received",
"accepted",
"pubmed",
"medline",
"entrez",
"pmc-release",
"revised",
"aheadofprint",
"retracted",
"ecollection",
]
date: datetime.date
class Grant(BaseModel):
"""Represents a grant item."""
id: str | None = None
acronym: str | None = None
agency: str # use ROR to ground agency
agency_reference: NamableReference | None = None
country: str # TODO use pydantic validation
#: aslo see edam:has_topic
HAS_TOPIC = Reference(prefix="biolink", identifier="has_topic")
#: also see biolink:published_in, EFO:0001796
IN_JOURNAL = Reference(prefix="uniprot.core", identifier="publishedIn")
CITES = Reference(prefix="cito", identifier="cites")
[docs]
class Article(BaseModel):
"""Represents an article."""
pubmed: int
title: str
date_completed: datetime.date | None = None
date_revised: datetime.date | None = None
type_mesh_ids: list[str] = Field(
default_factory=list, description="A list of MeSH LUIDs for article types"
)
headings: list[Heading] = Field(default_factory=list)
journal: Journal
journal_issue: JournalIssue
abstract: list[AbstractText] = Field(default_factory=list)
authors: list[Author | Collective] = Field(default_factory=list)
cites_pubmed_ids: list[str] = Field(default_factory=list)
xrefs: list[Reference] = Field(default_factory=list)
history: list[History] = Field(default_factory=list)
grants: list[Grant] = Field(default_factory=list)
@property
def year(self) -> int | None:
"""The year the article was published."""
if self.journal_issue.published:
return self.journal_issue.published.year
return None
@property
def date_published(self) -> datetime.date | None:
"""Get the date published from the journal issue."""
return self.journal_issue.published
[docs]
def get_abstract(self) -> str:
"""Get the full abstract."""
return " ".join(a.text for a in self.abstract)
def _triples(self) -> Iterable[Triple]:
s = Reference(prefix="pubmed", identifier=str(self.pubmed))
for p, o in self._pos():
yield Triple(subject=s, predicate=p, object=o)
def _pos(self) -> Iterable[tuple[Reference, Reference]]:
for type_mesh_id in self.type_mesh_ids:
yield v.rdf_type, Reference(prefix="mesh", identifier=type_mesh_id)
for heading in self.headings:
yield HAS_TOPIC, Reference(prefix="mesh", identifier=heading.mesh_id)
yield IN_JOURNAL, Reference(prefix="nlm", identifier=self.journal.nlm_catalog_id)
for author in self.authors:
match author:
case Collective() as collective if collective.reference:
yield v.has_contributor, collective.reference
case Author() as author if author.orcid:
yield v.has_contributor, Reference(prefix="orcid", identifier=author.orcid)
for pubmed in self.cites_pubmed_ids:
yield CITES, Reference(prefix="pubmed", identifier=pubmed)
for xref in self.xrefs:
yield v.exact_match, xref
[docs]
def is_retracted(self) -> bool:
"""Check if the article is retracted."""
# see https://www.ncbi.nlm.nih.gov/mesh/68016441
return "D016441" in self.type_mesh_ids
[docs]
def is_review(self) -> bool:
"""Check if the article is a review."""
# see https://www.ncbi.nlm.nih.gov/mesh/D016454
return "D016454" in self.type_mesh_ids
def _ensure_urls(url: str, cache_path: Path, *, force: bool) -> list[str]:
if cache_path.is_file() and not force:
text = cache_path.read_text()
else:
res = requests.get(url, timeout=300)
res.raise_for_status()
text = res.text
cache_path.write_text(text)
soup = BeautifulSoup(text, "html.parser")
return sorted(
(
url + href # type:ignore
for link in soup.find_all("a")
if (href := link.get("href")) and href.startswith("pubmed") and href.endswith(".xml.gz") # type:ignore
),
reverse=True,
)
def _parse_from_path(
path: Path,
*,
ror_grounder: ssslm.Grounder | None,
mesh_grounder: ssslm.Grounder | None,
author_grounder: ssslm.Grounder | None,
position: int | None = None,
) -> Iterable[Article]:
try:
tree = etree.parse(path)
except etree.XMLSyntaxError:
tqdm.write(f"failed to parse {path}")
return
for pubmed_article in tqdm(
tree.findall("PubmedArticle"),
unit="article",
unit_scale=True,
leave=False,
desc=f"parsing {path.name}",
position=position,
):
article = _extract_article(
pubmed_article,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
)
if article:
yield article
def _extract_article( # noqa:C901
element: Element,
*,
ror_grounder: ssslm.Grounder | None,
mesh_grounder: ssslm.Grounder | None,
author_grounder: ssslm.Grounder | None,
) -> Article | None:
medline_citation: Element | None = element.find("MedlineCitation")
if medline_citation is None:
raise ValueError("article is missing MedlineCitation tag")
pmid_tag = medline_citation.find("PMID")
if pmid_tag is None:
raise ValueError("article is missing PMID tag")
if not pmid_tag.text:
raise ValueError("article has an empty PMID tag")
pubmed = int(pmid_tag.text)
article = medline_citation.find("Article")
if article is None:
raise ValueError(f"[pubmed:{pubmed}] is missing an Article tag")
title_tag = article.find("ArticleTitle")
if title_tag is None:
raise ValueError(f"[pubmed:{pubmed}] is missing an ArticleTitle tag")
title = "".join(title_tag.itertext())
if title is None:
logger.debug(
"[pubmed:%s] has an empty ArticleTitle tag:%s",
pubmed,
etree.tostring(title_tag, pretty_print=True, encoding="unicode"),
)
return None
pubmed_data = element.find("PubmedData")
if pubmed_data is None:
raise ValueError(f"[pubmed:{pubmed}] is missing a PubmedData tag")
date_completed = parse_date(medline_citation.find("DateCompleted"))
date_revised = parse_date(medline_citation.find("DateRevised"))
types = sorted(
x.attrib["UI"]
for x in medline_citation.findall(".//PublicationTypeList/PublicationType")
if x.attrib["UI"]
)
headings = [
heading
for x in medline_citation.findall(".//MeshHeadingList/MeshHeading")
if (heading := parse_mesh_heading(x, mesh_grounder=mesh_grounder))
]
issns = [
ISSN(value=x.text, type=x.attrib["IssnType"])
for x in medline_citation.findall(".//Journal/ISSN")
]
medline_journal = medline_citation.find("MedlineJournalInfo")
if medline_journal is None:
logger.debug("[pubmed:%s] missing MedlineJournalInfo section", pubmed)
return None
issn_linking = medline_journal.findtext("ISSNLinking")
nlm_catalog_id = medline_journal.findtext("NlmUniqueID")
journal = Journal(
issn=issn_linking,
nlm_catalog_id=nlm_catalog_id,
issns=issns,
)
abstract_texts = []
for abstract_text_tag in medline_citation.findall(".//Abstract/AbstractText"):
text = "".join(abstract_text_tag.itertext())
if not text:
continue
abstract_text = AbstractText(
text=text,
label=abstract_text_tag.attrib.get("Label"),
category=abstract_text_tag.attrib.get("NlmCategory"),
)
abstract_texts.append(abstract_text)
authors = [
author
for i, author_tag in enumerate(medline_citation.findall(".//AuthorList/Author"), start=1)
if (
author := parse_author(
i,
author_tag,
ror_grounder=ror_grounder,
doc_key=pubmed,
author_grounder=author_grounder,
)
)
]
grants = [
_parse_grant(grant, ror_grounder=ror_grounder)
for grant in medline_citation.findall("..//GrantList/Grant")
]
cites_pubmed_ids = [
cites_pubmed_id
for citation_reference_tag in medline_citation.findall(".//ReferenceList/Reference")
if (cites_pubmed_id := _parse_reference(citation_reference_tag))
]
xrefs = [
Reference(prefix=prefix, identifier=article_id_tag.text)
for article_id_tag in pubmed_data.findall(".//ArticleIdList/ArticleId")
# it duplicates its own reference here for some reason, skip PII since it's
if article_id_tag.text and (prefix := article_id_tag.attrib["IdType"]) not in SKIP_PREFIXES
]
history = [
history
for pubmed_date in pubmed_data.findall(".//History/PubMedPubDate")
if (history := _parse_pub_date(pubmed_date))
]
journal_issue = _get_journal_issue(article)
return Article(
pubmed=pubmed,
title=title,
date_completed=date_completed,
date_revised=date_revised,
type_mesh_ids=types,
headings=headings,
journal=journal,
abstract=abstract_texts,
authors=authors,
xrefs=xrefs,
cites_pubmed_ids=cites_pubmed_ids,
history=history,
journal_issue=journal_issue,
grants=grants,
)
def _get_journal_issue(article: Element) -> JournalIssue:
volume = None
issue = None
publication_date = None
if (journal_element := article.find("Journal")) is not None:
if (journal_issue_element := journal_element.find("JournalIssue")) is not None:
volume = journal_issue_element.findtext("Volume")
# TODO create data model for issue? e.g., "1-2"
issue = journal_issue_element.findtext("Issue")
if (pubdate_element := journal_issue_element.find("PubDate")) is not None:
publication_date = parse_date(pubdate_element)
return JournalIssue(
volume=volume,
issue=issue,
published=publication_date,
)
def _parse_pub_date(element: Element) -> History | None:
status = element.attrib.get("PubStatus")
if status is None:
tqdm.write(f"missing status: {etree.tostring(element)}")
return None
date = parse_date(element)
if date is None:
return None
try:
rv = History(status=status, date=date)
except ValueError:
tqdm.write(f"invalid status: {status}")
return None
else:
return rv
SKIP_PREFIXES = {"pubmed"}
def _parse_reference(reference_tag: Element) -> str | None:
for article_id_tag in reference_tag.findall(".//ArticleIdList/ArticleId"):
if article_id_tag.attrib["IdType"] == "pubmed":
return article_id_tag.text
return None
def _parse_grant(element: Element, *, ror_grounder: ssslm.Grounder | None) -> Grant:
grant_id = element.findtext("GrantID")
acronym = element.findtext("Acronym")
agency = element.findtext("Agency")
if agency and ror_grounder is not None and (match := ror_grounder.get_best_match(agency)):
agency_reference = match.reference
else:
agency_reference = None
country = element.findtext("Country")
return Grant(
id=grant_id,
acronym=acronym,
agency=agency,
agency_reference=agency_reference,
country=country,
)
[docs]
def ensure_baselines(*, force: bool, source: Source | None = None) -> list[Path]:
"""Ensure all the baseline files are downloaded."""
return list(iterate_ensure_baselines(force=force, source=source))
[docs]
def iterate_ensure_baselines(
*, source: Source | None = None, force: bool = False
) -> Iterable[Path]:
"""Ensure all the baseline files are downloaded."""
if source == "remote" or source is None:
yield from thread_map(
_download_baseline,
_ensure_urls(BASELINE_URL, BASELINE_PATH, force=force),
desc="Downloading PubMed baseline",
leave=False,
)
elif source == "local":
yield from BASELINE_MODULE.base.glob("*.xml.gz")
else:
raise ValueError
Source: TypeAlias = Literal["remote", "local"]
[docs]
def process_baselines(
*, force_process: bool = False, source: Source | None = None
) -> list[Article]:
"""Ensure and process all baseline files."""
return list(iterate_process_baselines(force_process=force_process, source=source))
[docs]
def iterate_process_baselines(
*,
force_process: bool = False,
multiprocessing: bool = False,
ror_grounder: ssslm.Grounder | None = None,
mesh_grounder: ssslm.Grounder | None = None,
author_grounder: ssslm.Grounder | None = None,
force_listing: bool = False,
source: Source | None = None,
) -> Iterable[Article]:
"""Ensure and process all baseline files."""
paths = ensure_baselines(force=force_listing, source=source)
return _shared_process(
paths=paths,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
force_process=force_process,
multiprocessing=multiprocessing,
unit="baseline",
)
def _shared_process(
paths: Iterable[Path],
*,
ror_grounder: ssslm.Grounder | None = None,
mesh_grounder: ssslm.Grounder | None = None,
author_grounder: ssslm.Grounder | None = None,
force_process: bool = False,
unit: str,
multiprocessing: bool = False,
) -> Iterable[Article]:
tqdm_kwargs = {"unit_scale": True, "unit": unit, "desc": f"Processing {unit}s"}
if multiprocessing:
n_workers = (os.cpu_count() or 5) - 2
mp.set_start_method("spawn", force=True)
lock = mp.RLock()
tqdm.set_lock(lock)
# multiprocessing can't return generators, needs to consumed into lists
func = functools.partial(
_process_xml_gz,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
force_process=force_process,
n_workers=n_workers,
)
xxx = process_map(func, paths, **tqdm_kwargs, chunksize=1, max_workers=n_workers)
else:
func = functools.partial(
_iterate_process_xml_gz,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
force_process=force_process,
)
xxx = tmap(func, paths, **tqdm_kwargs)
return itt.chain.from_iterable(xxx)
[docs]
def ensure_updates(
*,
force: bool,
source: Source | None = None,
) -> list[Path]:
"""Ensure all the baseline files are downloaded."""
return list(iterate_ensure_updates(force=force, source=source))
[docs]
def iterate_ensure_updates(*, force: bool = False, source: Source | None = None) -> Iterable[Path]:
"""Ensure all the baseline files are downloaded."""
if source is None or source == "remote":
urls = _ensure_urls(UPDATES_URL, UPDATES_PATH, force=force)
yield from thread_map(
_download_updates,
urls,
desc="Downloading PubMed updates",
leave=False,
)
elif source == "local":
yield from UPDATES_MODULE.base.glob("*.xml.gz")
else:
raise ValueError(f"invalid source: {source}")
[docs]
def process_updates(*, force_process: bool = False) -> list[Article]:
"""Ensure and process updates."""
return list(iterate_process_updates(force_process=force_process))
def _ensure_grounders(
ror_grounder: ssslm.Grounder | None = None,
mesh_grounder: ssslm.Grounder | None = None,
author_grounder: ssslm.Grounder | None = None,
) -> tuple[ssslm.Grounder, ssslm.Grounder, ssslm.Grounder]:
if ror_grounder is None:
import pyobo
logger.info("getting ROR grounder")
ror_grounder = pyobo.get_grounder("ror")
logger.info("done getting ROR grounder")
if mesh_grounder is None:
import pyobo
logger.info("getting MeSH grounder")
mesh_grounder = pyobo.get_grounder("mesh")
logger.info("done getting MeSH grounder")
if author_grounder is None:
from orcid_downloader.lexical import get_orcid_grounder
logger.info("getting ORCiD grounder")
author_grounder = get_orcid_grounder()
logger.info("done getting ORCiD grounder")
return ror_grounder, mesh_grounder, author_grounder
[docs]
def iterate_process_updates(
*,
force_process: bool = False,
multiprocessing: bool = False,
ror_grounder: ssslm.Grounder | None = None,
mesh_grounder: ssslm.Grounder | None = None,
author_grounder: ssslm.Grounder | None = None,
force_listing: bool = False,
source: Source | None = None,
) -> Iterable[Article]:
"""Ensure and process updates."""
paths = ensure_updates(force=force_listing, source=source)
return _shared_process(
paths=paths,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
force_process=force_process,
multiprocessing=multiprocessing,
unit="update",
)
[docs]
def process_articles(
*,
force_process: bool = False,
multiprocessing: bool = False,
force_listing: bool = False,
source: Source | None = None,
) -> list[Article]:
"""Ensure and process articles from baseline, then updates."""
return list(
iterate_process_articles(
force_process=force_process,
multiprocessing=multiprocessing,
force_listing=force_listing,
source=source,
)
)
[docs]
def iterate_process_articles(
*,
force_process: bool = False,
ror_grounder: ssslm.Grounder | None = None,
mesh_grounder: ssslm.Grounder | None = None,
author_grounder: ssslm.Grounder | None = None,
multiprocessing: bool = False,
force_listing: bool = False,
source: Source | None = None,
ground: bool = True,
) -> Iterable[Article]:
"""Ensure and process articles from baseline, then updates."""
"""Ensure and process articles from baseline, then updates."""
if ground:
ror_grounder, mesh_grounder, author_grounder = _ensure_grounders(
ror_grounder, mesh_grounder, author_grounder
)
else:
ror_grounder, mesh_grounder, author_grounder = None, None, None
yield from iterate_process_updates(
force_process=force_process,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
multiprocessing=multiprocessing,
force_listing=force_listing,
source=source,
)
yield from iterate_process_baselines(
force_process=force_process,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
multiprocessing=multiprocessing,
force_listing=force_listing,
source=source,
)
[docs]
def iterate_ensure_articles(*, force: bool = False, source: Source | None = None) -> Iterable[Path]:
"""Ensure articles from baseline, then updates."""
yield from iterate_ensure_updates(force=force, source=source)
yield from iterate_ensure_baselines(force=force, source=source)
def _process_xml_gz(
path: Path,
*,
ror_grounder: ssslm.Grounder | None,
mesh_grounder: ssslm.Grounder | None,
author_grounder: ssslm.Grounder | None,
force_process: bool = False,
n_workers: int | None = None,
) -> Iterable[Article]:
"""Process an XML file, cache a JSON version, and return it."""
return list(
_iterate_process_xml_gz(
path=path,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
force_process=force_process,
n_workers=n_workers,
)
)
def _iterate_process_xml_gz(
path: Path,
*,
ror_grounder: ssslm.Grounder | None,
mesh_grounder: ssslm.Grounder | None,
author_grounder: ssslm.Grounder | None,
force_process: bool = False,
n_workers: int | None = None,
) -> Iterable[Article]:
"""Process an XML file, cache a JSON version, and return it."""
if n_workers:
# Worker identity is 1-based
worker_id = mp.current_process()._identity[0] - 1
# add one back in since there's an outer progress bar that should always
# be in the first position
position = 1 + worker_id % n_workers
else:
position = None
new_name = path.stem.removesuffix(".xml")
new_path = path.with_stem(new_name).with_suffix(".jsonl.gz")
if new_path.is_file() and not force_process:
with gzip.open(new_path, mode="rt") as file:
for line in file:
yield Article.model_validate_json(line)
else:
# write everything to a temporary path, then rename it
# to the desired path at the end. this makes it so if
# we cut off work in the middle, then we don't get weirdly
# out of sync
new_tmp_path = path.with_stem(new_name + "-tmp").with_suffix(".jsonl.gz")
with logging_redirect_tqdm(), gzip.open(new_tmp_path, mode="wt") as file:
for model in _parse_from_path(
path,
ror_grounder=ror_grounder,
mesh_grounder=mesh_grounder,
author_grounder=author_grounder,
position=position,
):
file.write(
model.model_dump_json(
exclude_none=True, exclude_defaults=True, ensure_ascii=False
)
+ "\n"
)
yield model
new_tmp_path.rename(new_path)
def get_edges(*, force_process: bool = False, **kwargs: Any) -> list[Triple]:
"""Get edges from PubMed."""
if EDGES_PATH.is_file() and not force_process:
return read_triples(EDGES_PATH)
rv = list(iterate_edges(force_process=force_process, **kwargs))
write_triples(rv, EDGES_PATH)
return rv
def iterate_edges(**kwargs: Any) -> Iterable[Triple]:
"""Iterate over edges from PubMed."""
for article in iterate_process_articles(**kwargs):
yield from article._triples()
def save_sssom(*, path: str | Path | TextIO | None = None, **kwargs: Any) -> None:
"""Save an SSSOM file for articles."""
if path is None:
path = SSSOM_PATH
with safe_open_writer(path) as writer:
for article in iterate_process_articles(**kwargs):
p = f"pubmed:{article.pubmed}"
for xref in article.xrefs:
writer.writerow((p, xref.curie))
@click.command(name="articles")
@click.option(
"-f",
"--force-process",
is_flag=True,
help="If given, re-processes the articles. Does not re-download.",
)
@click.option("-m", "--multiprocessing", is_flag=True, help="Should multiprocessing get used?")
@verbose_option
@click.option(
"--source",
type=click.Choice(list(typing.get_args(Source))),
default="remote",
help="Choose where the index of files comes from. Remote means that the NCBI FTP server is "
"queried, which is flaky. Local means a previous cache of the index is used.",
)
@click.option(
"--ground/--no-ground",
is_flag=True,
default=True,
show_default=True,
help="Should MeSH terms, organizations, and authors get grounded?",
)
@click.option(
"--clear",
is_flag=True,
help="Delete cached JSON/JSONL files for articles, then exit without doing any processing. "
"Useful for cleaning up and iteratively developing the processor.",
)
def _main(
force_process: bool, multiprocessing: bool, source: Source | None, ground: bool, clear: bool
) -> None:
"""Download and process articles."""
if clear:
click.secho("deleting cached JSON/JSONL files", fg="yellow")
click.confirm("Are you sure you want to delete the cached JSON/JSONL files?", abort=True)
for path in tqdm(
list(
chain(
BASELINE_MODULE.base.glob("*.json.gz"),
BASELINE_MODULE.base.glob("*.jsonl.gz"),
UPDATES_MODULE.base.glob("*.json.gz"),
UPDATES_MODULE.base.glob("*.jsonl.gz"),
)
)
):
tqdm.write(f"deleting {path}")
path.unlink()
sys.exit(0)
for _ in iterate_process_articles(
force_process=force_process, multiprocessing=multiprocessing, source=source, ground=ground
):
pass
if __name__ == "__main__":
_main()