# coding=utf-8
'''
Template helpers for Kata CKAN extension.
'''
from pylons import config
import functionally as fn
import ckan.model as model
from ckan.model import Related, Package, User
from ckan.lib.base import g, h
from ckan.logic import get_action, ValidationError
from ckanext.kata import settings
[docs]def has_agents_field(data_dict, field):
'''Return true if some of the data dict's agents has attribute given in field.'''
return [] != filter(lambda x : x.get(field), data_dict.get('agent', []))
[docs]def reference_update(ref):
#@beaker_cache(type="dbm", expire=2678400)
def cached_url(url):
return url
return cached_url(ref)
[docs]def get_dict_field_errors(errors, field, index, name):
'''Get errors correctly for fields that are represented as nested dict fields in data_dict.
:param errors: the error dict
:param field: field name
:param index: index, eg. agents are repeatable -> a certain agent might lie in index 2
:param name:
:returns: `[u'error1', u'error2']`
'''
error = []
error_dict = errors.get(field)
try:
if error_dict and error_dict[index]:
error = error_dict[index].get(name)
except IndexError:
pass
return error
[docs]def get_package_ratings_for_data_dict(data_dict):
'''
Create a metadata rating (1-5) for given data_dict.
This is the same as :meth:`get_package_ratings` but can be used
for getting metadata ratings e.g. for search results where
only raw data_dicts are available rather than already-converted
package dicts.
:param data_dict: A CKAN data_dict
'''
from ckanext.kata.schemas import Schemas # Importing here prevents circular import
context = {
'model': model,
'schema': Schemas.show_package_schema()
}
try:
pkg_dict = get_action('package_show')(context, data_dict)
except ValidationError:
return (0, u'○○○○○')
return get_package_ratings(pkg_dict)
[docs]def get_package_ratings(data):
'''
Create a metadata rating (1-5) for given dataset
:param data: A CKAN data_dict
:returns: `(rating, stars)`
'''
score = 0 # Scale 0-49
required_fields =['pids', 'version', 'contact', 'license_id', 'agent', 'language', 'availability']
if all(data.get(field) for field in required_fields):
score += 2
# MAX 2
pid_types = [pid.get('type') for pid in data.get('pids', [])]
pid_types_expected = ['data', 'metadata', 'version']
if len(pid_types) < 3:
# The minimum metadata model is a bit vague in this part, this is one iterpretation
pid_types_expected.pop(2)
if all(pid_type in pid_types for pid_type in pid_types_expected):
score += 2 * len(pid_types) if len(pid_types) < 3 else 6
if len(unicode(data.get('version', ''))) > 15: # ISO8601 datetime
score += 1
# MAX 9
if data.get('license_id', '') not in ['notspecified', '']:
score += 6
if not (data.get('tags') or data.get('tag_string')): # Either of these should be present
score -= 5 # MINUS
if len(data.get('agent', [])) >= 2:
score += 2 * len(data['agent']) if len(data['agent']) < 6 else 6
if len(data.get('event', [])) >= 1:
score += 1
if get_funder(data):
score += 6
# MAX 28
if len(unicode(data.get('notes', ''))) >= 10:
score += (len(data['notes']) / 10) if len(data['notes']) < 60 else 6
required_fields = ['geographic_coverage', 'event', 'checksum', 'algorithm', 'mimetype', 'langtitle']
score += len(filter(lambda field: data.get(field), required_fields))
# MAX 40
if filter(lambda con: con.get('name') and con.get('email') and con.get('URL') and con.get('phone'), data.get('contact')):
score += 4
# MAX 44
if data.get('temporal_coverage_begin') and data.get('temporal_coverage_end'):
score += 1
if data.get('discipline'):
score += 4
# MAX 49
rating = 1 + int(score / 10)
stars = u'●●●●●'[:rating] + u'○○○○○'[rating:] # Star rating as string
return (rating, stars)
[docs]def get_if_url(data):
'''
Try to guess if data is sufficient type for rdf:about
:param data: the data to check out
:rtype: boolean
'''
if data and (data.startswith('http://') or data.startswith('https://') or \
data.startswith('urn:')):
return True
return False
[docs]def string_to_list(data):
'''
Split languages and make it a list for Genshi (read.rdf)
:param data: the string to split
:rtype: list
'''
if data:
return data.split(", ")
return ''
[docs]def get_first_admin(id):
'''
Get the url of the first one with an admin role
:param id: the package id
:returns: profile url
:rtype: string
'''
pkg = Package.get(id)
if pkg:
data = pkg.as_dict()
user = None
if pkg.roles:
owner = [role for role in pkg.roles if role.role == 'admin']
if len(owner):
user = User.get(owner[0].user_id)
profileurl = ""
if user:
profileurl = config.get('ckan.site_url', '') + \
h.url_for(controller="user", action="read",
id=user.name)
return profileurl
return False
[docs]def get_rightscategory(license):
'''
Return rightscategory based on license id
:returns: LICENSED, COPYRIGHTED or PUBLIC DOMAIN
'''
if license == "other_closed":
return "COPYRIGHTED"
if license == "cc-zero" or license == "cc-by" or license == "cc-by-4.0":
return "LICENSED"
# Can not recognise the license:
return "OTHER"
[docs]def get_authors(data_dict):
'''Get all authors from agent field in data_dict'''
return filter(lambda x: x.get('role') == u'author', data_dict.get('agent', []))
[docs]def get_distributor(data_dict):
'''Get a single distributor from agent field in data_dict'''
return fn.first(filter(lambda x: x.get('role') == u'distributor', data_dict.get('agent', [])))
[docs]def get_owner(data_dict):
'''Get a single owner from agent field in data_dict'''
return fn.first(filter(lambda x: x.get('role') == u'owner', data_dict.get('agent', [])))
[docs]def resolve_agent_role(role):
'''
Get a non-translated role name.
'''
return settings.AGENT_ROLES.get(role, None)
[docs]def get_funder(data_dict):
'''Get a single funder from agent field in data_dict'''
return fn.first(get_funders(data_dict))
[docs]def get_funders(data_dict):
'''Get all funders from agent field in data_dict'''
# return filter(lambda x: x.get('role') == u'funder', data_dict.get('agent', []))
# TODO: Fix validators to not create empty agents
return filter(lambda x: x.get('role') == u'funder' and \
(x.get('name') or x.get('id') or x.get('URL') or x.get('organisation')),
data_dict.get('agent', []))