Source code for hepdata.modules.submission.api
#
# This file is part of HEPData.
# Copyright (C) 2016 CERN.
#
# HEPData is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# HEPData is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with HEPData; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
#
import logging
from invenio_db import db
from hepdata.modules.submission.models import DataResource, SubmissionObserver
from hepdata.modules.permissions.models import SubmissionParticipant
from hepdata.modules.submission.models import HEPSubmission
"""Common utilities used across the code base."""
logging.basicConfig()
log = logging.getLogger(__name__)
[docs]
def is_resource_added_to_submission(recid, version, resource_url):
"""
Returns if a submission already has the given resource url
:param recid:
:param version:
:param resource_url:
:return:
"""
return HEPSubmission.query.filter(HEPSubmission.publication_recid == recid,
HEPSubmission.version == version,
HEPSubmission.resources.any(
DataResource.file_location == resource_url)).count() > 0
[docs]
def get_latest_hepsubmission(*args, **kwargs):
"""
Gets the latest HEPSubmission record matching the given kwargs
:return: the HEPSubmission object or None
"""
hepsubmissions = HEPSubmission.query.filter_by(**kwargs).all()
last = None
for hepsubmission in hepsubmissions:
if last is None:
last = hepsubmission
else:
if hepsubmission.version > last.version:
last = hepsubmission
return last
[docs]
def get_submission_participants_for_record(publication_recid, roles=None, **kwargs):
"""Gets the participants for a given publication record id
:param int publication_recid: publication_recid of a submission.
:param ``**kwargs``: Additional filter parameters to pass to `filter_by`.
:return: List of participants relating to that record
:rtype: list[SubmissionParticipant]
"""
query = SubmissionParticipant.query.filter_by(
publication_recid=publication_recid,
**kwargs
)
if roles:
query = query.filter(SubmissionParticipant.role.in_(roles))
return query.all()
[docs]
def get_primary_submission_participants_for_record(publication_recid):
submission_participants = get_submission_participants_for_record(publication_recid, status="primary")
return submission_participants
[docs]
def get_or_create_submission_observer(publication_recid, regenerate=False):
"""
Gets or re/generates a SubmissionObserver key for a given recid.
Where an observer does not exist for a recid (with existing sub),
it is created and returned instead.
:param publication_recid: The publication record id
:param regenerate: Whether to regenerate/force generate the key
:return: SubmissionObserver key, created, or None
"""
submission_observer = SubmissionObserver.query.filter_by(publication_recid=publication_recid).first()
created = False
if submission_observer is None:
submission = get_latest_hepsubmission(publication_recid=publication_recid)
if submission:
if submission.overall_status == "todo" or regenerate:
submission_observer = SubmissionObserver(publication_recid=publication_recid)
created = True
else:
# No submission, no observer, return None
return None
# If we are to regenerate, and SubmissionObserver was queried and not generated.
# If just created, we don't need to generate anything.
if not created and regenerate:
submission_observer.generate_observer_key()
# Only commit if we have created or regenerated
if created or regenerate:
db.session.add(submission_observer)
db.session.commit()
return submission_observer
[docs]
def delete_submission_observer(recid):
"""
Deletes a SubmissionObserver object from the database
based on a given recid value.
:param: recid: int - The recid to delete on
"""
# Validate recid is an integer
try:
recid = int(recid)
except (ValueError, TypeError) as e:
log.error(f"Invalid recid provided for observer deletion: {recid}")
raise ValueError(f"Supplied recid value ({recid}) for deletion is not an Integer.") from e
try:
submission_observer = SubmissionObserver.query.filter_by(publication_recid=recid).first()
if submission_observer:
db.session.delete(submission_observer)
db.session.commit()
log.info(f"Deleted observer for submission {recid}")
except Exception as e:
log.error(f"Error deleting observer for submission {recid}: {e}")
db.session.rollback()
raise