Source code for ckanext.oaipmh.cmdi_reader

from lxml import etree
from urlparse import urlparse
from ckanext.kata.utils import datapid_to_name
from ckanext.oaipmh.importcore import generic_xml_metadata_reader
import oaipmh.common
from functionally import first
from pylons import config


[docs]class CmdiReaderException(Exception): """ Reader exception is thrown on unexpected data or error. """ pass
[docs]class CmdiReader(object): """ Reader for CMDI XML data """ namespaces = {'oai': "http://www.openarchives.org/OAI/2.0/", 'cmd': "http://www.clarin.eu/cmd/"} def __init__(self, provider=None): """ Generate new reader instance. :param provider: URL used for pids. """ super(CmdiReader, self).__init__() self.provider = provider or config.get('ckan.site_url') def __call__(self, xml): """ Call :meth:`CmdiReader.read`. """ return self.read(xml) @classmethod def _text_xpath(cls, root, query): """ Select list of texts and strip results. Use text() suffix in Xpath `query`. :param root: parent element (lxml) where selection is made. :param query: Xpath query used to get data :return: list of strings """ return [unicode(text).strip() for text in root.xpath(query, namespaces=cls.namespaces)] @staticmethod def _to_name(identifier): """ Convert identifier to CKAN package name. :param identifier: identifier string :return: CKAN package name """ parsed = urlparse(identifier) if parsed.scheme and parsed.netloc: identifier = parsed.path.strip('/') return datapid_to_name(identifier) @staticmethod def _strip_first(elements): """ Strip and return first element. :param elements: list of xml elements :return: first element or none """ return (first(elements) or "").strip() @classmethod def _get_organizations(cls, root, xpath): """ Extract organization dictionaries from XML using given Xpath. :param root: parent element (lxml) where selection is done. :param xpath: xpath selector used to get data :return: list of organization dictionaries """ return [{'role': cls._strip_first(organization.xpath("cmd:role/text()", namespaces=cls.namespaces)), 'name': cls._strip_first(organization.xpath("cmd:organizationInfo/cmd:organizationName/text()", namespaces=cls.namespaces)), 'short_name': cls._strip_first(organization.xpath("cmd:organizationInfo/cmd:organizationShortName/text()", namespaces=cls.namespaces)), 'email': cls._strip_first(organization.xpath("cmd:organizationInfo/cmd:communicationInfo/cmd:email/text()", namespaces=cls.namespaces)), 'url': cls._strip_first(organization.xpath("cmd:organizationInfo/cmd:communicationInfo/cmd:email/text()", namespaces=cls.namespaces))} for organization in root.xpath(xpath, namespaces=cls.namespaces)] @classmethod def _get_persons(cls, root, xpath): """ Extract person dictionary from XML using given Xpath. :param root: parent element (lxml) where selection is done :param xpath: xpath selector used to get data :return: list of person dictionaries """ return [{'role': cls._strip_first(person.xpath("cmd:role/text()", namespaces=cls.namespaces)), 'surname': cls._strip_first(person.xpath("cmd:personInfo/cmd:surname/text()", namespaces=cls.namespaces)), 'given_name': cls._strip_first(person.xpath("cmd:personInfo/cmd:givenName/text()", namespaces=cls.namespaces)), 'email': cls._strip_first(person.xpath("cmd:personInfo/cmd:communicationInfo/cmd:email/text()", namespaces=cls.namespaces)), 'organization': first(cls._get_organizations(person, "cmd:personInfo/cmd:affiliation"))} for person in root.xpath(xpath, namespaces=cls.namespaces)] @staticmethod def _get_person_name(person): """ Generate name from person dictionary. :param person: person dictionary :return: name of the person """ return u"%s %s" % (person['given_name'], person['surname']) @classmethod def _persons_as_contact(cls, persons): """ Convert person dictionaries to contact dictionaries. :param persons: list of person dictionaries :return: list of contact dictionaries """ return [{'name': cls._get_person_name(person), 'url': (person.get('organization', None) or {}).get('url', ""), 'email': person['email'], 'phone': ""} for person in persons] @staticmethod def _organization_as_agent(organizations, agent_role): """ Convert organization dictionaries to agent dictionaries. :param organizations: list of organization dictionaries :param agent_role: name of the role :return: list of agent dictionaries """ return [{'name': "", 'organisation': organization.get('name', ""), 'role': agent_role} for organization in organizations] @classmethod def _persons_as_agent(cls, persons, agent_role): """ Convert person dictionaries to agent dictionaries. :param persons: list of person dictionaries :param agent_role: name of the role :return: list of agent dictionaries """ return [{'name': cls._get_person_name(person), 'organisation': (person.get('organization', None) or {}).get('name', ""), 'role': agent_role} for person in persons]
[docs] def read(self, xml): """ Extract package data from given XML. :param xml: xml element (lxml) :return: oaipmh.common.Metadata object generated from xml """ result = generic_xml_metadata_reader(xml).getMap() result['unified'] = self.read_data(xml) return oaipmh.common.Metadata(result)
[docs] def read_data(self, xml): """ Extract package data from given XML. :param xml: xml element (lxml) :return: dictionary """ metadata_identifiers = self._text_xpath(xml, '//oai:record/oai:header/oai:identifier/text()') cmd = first(xml.xpath('//oai:record/oai:metadata/cmd:CMD', namespaces=self.namespaces)) if cmd is None: raise CmdiReaderException("Unexpected XML format: No CMD -element found") resource_info = cmd.xpath("//cmd:Components/cmd:resourceInfo", namespaces=self.namespaces)[0] if resource_info is None: raise CmdiReaderException("Unexpected XML format: No resourceInfo -element found") languages = self._text_xpath(cmd, "//cmd:corpusInfo/cmd:corpusMediaType/cmd:corpusTextInfo/cmd:languageInfo/cmd:languageId/text()") data_identifiers = self._text_xpath(cmd, "//cmd:identificationInfo/cmd:identifier/text()") description = first(self._text_xpath(cmd, "//cmd:identificationInfo/cmd:description/text()")) titles = [{'lang': title.get('{http://www.w3.org/XML/1998/namespace}lang', ''), 'value': title.text.strip()} for title in xml.xpath('//cmd:identificationInfo/cmd:resourceName', namespaces=self.namespaces)] primary_pid = None provider = self.provider pids = [dict(id=pid, provider=provider, type='data') for pid in data_identifiers] for pid in pids: if 'urn' in pid.get('id', ""): pid['primary'] = "true" primary_pid = pid['id'] pids += [dict(id=pid, provider=provider, type='metadata') for pid in metadata_identifiers] version = first(self._text_xpath(resource_info, "//cmd:metadataInfo/cmd:metadataLastDateUpdated/text()")) or "" # TODO: Check agent mapping. #print "###", _get_persons(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:licensorPerson") #print "###", _get_persons(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:distributionRightsHolderPerson") #print "###", _get_persons(resource_info, "//cmd:distributionInfo/cmd:iprHolderPerson") #print "###", _get_persons(resource_info, "//cmd:contactPerson") #print "###", _get_persons(resource_info, "//cmd:metadataInfo/cmd:metadataCreator") #print "###", _get_organizations(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:licensorOrganization") #print "###", _get_organizations(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:distributionRightsHolderOrganization") #print "###", _get_organizations(resource_info, "//cmd:distributionInfo/cmd:iprHolderOrganization") contacts = self._persons_as_contact(self._get_persons(resource_info, "//cmd:contactPerson")) agents = [] agents.extend(self._persons_as_agent(self._get_persons(resource_info, "//cmd:distributionInfo/cmd:iprHolderPerson"), 'author')) agents.extend(self._persons_as_agent(self._get_persons(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:distributionRightsHolderPerson"), 'owner')) agents.extend(self._organization_as_agent(self._get_organizations(resource_info, "//cmd:distributionInfo/cmd:iprHolderOrganization"), 'author')) agents.extend(self._organization_as_agent(self._get_organizations(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:distributionRightsHolderOrganization"), 'owner')) result = {'name': self._to_name(primary_pid or first(data_identifiers)), 'language': ",".join(languages), 'pids': pids, 'version': version, 'tag_string': 'cmdi', # TODO: Ask about value! 'notes': description, 'langtitle': titles, 'type': 'dataset', 'contact': contacts, 'agent': agents, 'availability': 'contact_owner'} # TODO: Ask about distributionAccessMedium # _strip_first(_text_xpath(resource_info, "//cmd:distributionInfo/availability/text()")) # url = _strip_first(_text_xpath(resource_info, "//cmd:identificationInfo/cmd:url/text()")) download_location = first(self._text_xpath(resource_info, "//cmd:distributionInfo/cmd:licenceInfo/cmd:downloadLocation/text()")) if download_location: result['through_provider_URL'] = download_location result['availability'] = 'through_provider' return result