Source code for ckanext.kata.converters

# pylint: disable=unused-argument

"""
Functions to convert dataset form fields from or to db fields.
"""
import json
import pycountry

from pylons import h

import re
import logging
from pylons.i18n import _
from ckan.lib.navl.dictization_functions import missing

from ckan.logic.action.create import related_create
from ckan.model import Related, Session, Group, repo
from ckan.model.authz import setup_default_user_roles

from ckanext.kata import settings, utils

log = logging.getLogger('ckanext.kata.converters')


[docs]def org_auth_to_extras(key, data, errors, context): ''' Convert author and organization to extras ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras if len(data[key]) > 0: if key[0] == 'orgauth': if (not ('orgauth', key[1], 'org') in data or len(data[('orgauth', key[1], 'org')]) == 0): errors[key].append(_('Organisation is missing')) if (not ('orgauth', key[1], 'value') in data or len(data[('orgauth', key[1], 'value')]) == 0): errors[key].append(_('Author is missing')) oval = data[(key[0], key[1], 'org')] extras.append({'key': "author_%s" % key[1], 'value': data[key]}) extras.append({'key': 'organization_%s' % key[1], 'value': oval })
[docs]def org_auth_from_extras(key, data, errors, context): ''' Convert (author, organization) pairs from `package.extra` to `orgauths` dict :param key: key :param data: data :param errors: validation errors :param context: context ''' orgauths = data.get(('orgauth',), []) if not orgauths: data[('orgauth',)] = orgauths authors = [] orgs = [] for k in data.keys(): if 'extras' in k and 'key' in k: if re.search('^(author_)\d+$', data[k]): val = data[(k[0], k[1], 'value')] author = {'key': data[k], 'value': val} if author not in authors: authors.append(author) data.pop((k[0], k[1], 'value'), None) data.pop((k[0], k[1], '__extras'), None) data.pop(k, None) continue if re.search('^(organization_)\d+$', data[k]): val = data[(k[0], k[1], 'value')] org = {'key': data[k], 'org': val} if org not in orgs: orgs.append(org) data.pop((k[0], k[1], 'value'), None) data.pop((k[0], k[1], '__extras'), None) data.pop(k, None) orgs = sorted(orgs, key=lambda ke: int(ke['key'].rsplit('_', 1)[1])) authors = sorted(authors, key=lambda ke: int(ke['key'].rsplit('_', 1)[1])) for org, author in zip(orgs, authors): orgauth = {} orgauth.update({'value': author['value']}) orgauth.update({'org': org['org']}) if not orgauth in orgauths: orgauths.append(orgauth)
[docs]def ltitle_to_extras(key, data, errors, context): ''' Convert title & language pair from dataset form to db format and validate. Title & language pairs will be stored in package_extra. :param key: key :param data: data :param errors: validation errors :param context: context ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras if len(data[key]) > 0: # Get title's language from data dictionary. key[0] == 'title'. lval = data[(key[0], key[1], 'lang')] extras.append({'key': "title_%s" % key[1], 'value': data[key]}) extras.append({'key': 'lang_title_%s' % key[1], 'value': lval })
[docs]def ltitle_from_extras(key, data, errors, context): ''' Convert all title & language pairs from db format to dataset form format. :param key: key :param data: data :param errors: validation errors :param context: context ''' langtitles = data.get(('langtitle',), []) if not langtitles: data[('langtitle',)] = langtitles titles = [] langs = [] for k in data.keys(): if 'extras' in k and 'key' in k: if re.search('^(title_|ltitle)\d+$', data[k]): val = data[(k[0], k[1], 'value')] title = {'key': data[k], 'value': val} if title not in titles: titles.append(title) data.pop((k[0], k[1], 'value'), None) data.pop((k[0], k[1], '__extras'), None) data.pop(k, None) continue if re.search('^(lsel|lang_title_)\d+$', data[k]): val = data[(k[0], k[1], 'value')] lang = {'key': data[k], 'lang': val} if lang not in langs: langs.append(lang) data.pop((k[0], k[1], 'value'), None) data.pop((k[0], k[1], '__extras'), None) data.pop(k, None) langs = sorted(langs, key=lambda ke: int(ke['key'].rsplit('_', 1)[1])) titles = sorted(titles, key=lambda ke: int(ke['key'].rsplit('_', 1)[1])) for lang, title in zip(langs, titles): langtitle = {} langtitle.update({'value': title['value']}) langtitle.update({'lang': lang['lang']}) if not langtitle in langtitles: langtitles.append(langtitle)
[docs]def add_to_group(key, data, errors, context): ''' Add a new group if it doesn't yet exist. :param key: key :param data: data :param errors: validation errors :param context: context ''' val = data.get(key) if val: repo.new_revision() grp = Group.get(val) # UI code needs group created if it does not match. Hence do so. if not grp: grp = Group(name=val, description=val, title=val) setup_default_user_roles(grp) grp.save() repo.commit()
[docs]def remove_disabled_languages(key, data, errors, context): ''' If `langdis == 'True'`, remove all languages. Expecting language codes in `data['key']`. :param key: key :param data: data :param errors: validation errors :param context: context ''' langdis = data.get(('langdis',)) langs = data.get(key).split(',') if langdis == 'False': # Language enabled if langs == [u'']: errors[key].append(_('No language given')) else: # Language disabled # Display flash message if user is loading a page. if 'session' in globals(): h.flash_notice(_("Language is disabled, removing languages: '%s'" % data[key])) # Remove languages. del data[key] data[key] = u''
[docs]def remove_access_application_new_form(key, data, errors, context): ''' If availability changes remove access_application_new_form. Expecting string: `True` or `False` in `data['key']`. :param key: key :param data: data :param errors: validation errors :param context: context ''' if data.get(('availability',)) != 'access_application': # Remove checkbox value. del data[key] data[key] = u''
[docs]def checkbox_to_boolean(key, data, errors, context): ''' Convert HTML checkbox's value ('on' / null) to boolean string ''' value = data.get(key, None) if value not in [u'True', u'False']: if value == u'on': data[key] = u'True' else: data[key] = u'False'
[docs]def update_pid(key, data, errors, context): ''' Replace an empty unicode string with random PID. ''' if type(data[key]) == unicode: if len(data[key]) == 0: data[key] = utils.generate_pid()
[docs]def convert_from_extras_kata(key, data, errors, context): ''' Convert all extras fields from extras to data dict and remove fields from extras. Removal helps counter IndexError with unflatten after validation. :param key: key :param data: data :param errors: validation errors :param context: context ''' for k in data.keys(): if k[0] == 'extras' and k[-1] == 'key' and data[k] in settings.KATA_FIELDS: key = ''.join(data[k]) data[(key,)] = data[(k[0], k[1], 'value')] for _remove in data.keys(): if _remove[0] == 'extras' and _remove[1] == k[1]: del data[_remove]
[docs]def convert_to_extras_kata(key, data, errors, context): ''' Convert one extras fields from extras to data dict and remove fields from extras. Removal helps counter IndexError with unflatten after validation. :param key: key :param data: data :param errors: validation errors :param context: context ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras extras.append({'key': key[-1], 'value': data[key]})
[docs]def xpath_to_extras(key, data, errors, context): ''' Convert xpaths to extras :param key: key :param data: data :param errors: validation errors :param context: context ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras for k, v in data[key].iteritems(): extras.append({'key': k, 'value': v})
[docs]def convert_languages(key, data, errors, context): ''' Convert ISO 639 language abbreviations to ISO 639-2 T. data['key'] may be a string with comma separated values or a single language code. :param key: key :param data: data :param errors: validation errors :param context: context ''' value = data.get(key) if not isinstance(value, basestring): return langs = value.split(',') new_langs = [] for lang in langs: lang = lang.strip() if lang: try: pycountry.languages.get(terminology=lang) new_langs.append(lang) except KeyError: try: # Convert two character language codes lang_object = pycountry.languages.get(alpha2=lang) new_langs.append(lang_object.terminology) except KeyError as ke: errors[key].append(_('Language %s not in ISO 639-2 T format') % lang) # We could still try to convert from ISO 639-2 B if it shows up somewhere if new_langs: data[key] = ', '.join(new_langs)
[docs]def from_extras_json(key, data, errors, context): ''' Convert a field from JSON format in extras to data_dict. The `key` parameter is the field where to save values, so we need to search data_dict to find the correct value which we are converting. :param key: key for example `('pids',)` :param data: data, contains value somewhere, like `('extras', 5, 'value')` :param errors: validation errors :param context: context ''' for k in data.keys(): if k[0] == 'extras' and k[-1] == 'key' and data[k] == key[0]: data[key] = json.loads(data[(k[0], k[1], 'value')]) data.pop(('extras', k[1], 'key'), None) data.pop(('extras', k[1], 'value'), None) data.pop(('extras', k[1], '__extras'), None)
[docs]def to_extras_json(key, data, errors, context): ''' Convert a field to JSON format and store in extras. ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras extras.append({'key': key[0], 'value': json.dumps(data[key])})
[docs]def flattened_to_extras(key, data, errors, context): ''' Convert a flattened key-value pair from data_dict to extras. For example `(pids, 0, provider)` -> `extras['pids_0_provider']`. .. note:: The key-value pairs to convert are before CKAN's flattening in format pids: [{ id: 'some pid', type: 'data', ...},{ id: 'other pid', type: 'data', ...}] :param key: key, for example `(pids, 0, provider)` :param data: data :param errors: validation errors :param context: context ''' extras = data.get(('extras',), []) if not extras: data[('extras',)] = extras if data[key]: extras.append({'key': "%s_%s_%s" % key, 'value': data[key]})
[docs]def flattened_from_extras(key, data, errors, context): ''' Convert a whole bunch of flattened key-value pairs from extras to a list of dicts in data_dict. Format in extras must be like `key[0]_index_innerkey`. For example: `pids_02_provider`. :param key: The key to convert as tuple, for example `('pids',)` :param data: data :param errors: validation errors :param context: context ''' destination_key = key[0] # For example 'pids' # make destination a pointer to data-dict destination = data.get((destination_key,), []) if destination is missing: destination = [] if not destination: data[(destination_key,)] = destination for k in data.keys(): if k[0] == 'extras' and k[2] == 'key': extras_key = data[k].split('_') extras_value = data[(k[0], k[1], 'value')] if extras_key[0] == destination_key: index = int(extras_key[1]) destination += [{} for i in range(index - len(destination) + 1)] # pad destination list with {}'s destination[index].update({extras_key[2]: extras_value}) data.pop((k[0], k[1], 'key'), None) data.pop((k[0], k[1], 'value'), None) data.pop((k[0], k[1], '__extras'), None)