bpeterso2000
8/7/2014 - 4:01 PM

Accepts a YAML file with field names, types & analyzers and generates an Elasticsearch JSON mapping file

Accepts a YAML file with field names, types & analyzers and generates an Elasticsearch JSON mapping file

# -*- coding: utf-8 -*-
"""
Simple ElasticSearch Field Mapping Template Generator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Example YAML file
-----------------

::
sandbox
---
form_number:
    - lower_alphanum
    - lower_alphanum_prefix
form_name:
    - lower_keyword
    - snowball
revision_date:
    - date
revision_number:
revision_replaces:
revision_replaced_by:
revision_ready_for_publication:
    - boolean

* This first line of the YAML file must be the Elasticsearch database doc-type



"""
import argparse
import json
import logging
import sys
from collections import OrderedDict

try:
    import yaml
except ImportError:
    print('Requires pyymal. Try pip --install pyyaml')


ES_CORE_TYPES = ['string', 'boolean', 'integer', 'short', 'long', 'float',
                 'double', 'byte', 'binary', 'token_count', 'date']

# -----------------------------------------------------------------------------
# Load YAML into an OrderedDict
# reference: http://stackoverflow.com/questions/5121931


class OrderedLoader(yaml.SafeLoader):
    pass


def construct_mapping(loader, node):
    loader.flatten_mapping(node)
    return OrderedDict(loader.construct_pairs(node))


def yaml_ordered_load(stream):
    default_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG
    OrderedLoader.add_constructor(default_mapping_tag, construct_mapping)
    return yaml.load_all(stream, OrderedLoader)

# ----------------------------------------------------------------------------


def _get_next_document(documents, name, logger):
    logger = logging.getLogger(__name__)
    try:
        return next(documents)
    except StopIteration:
        logger.warning('No {} found the YAML file.'.format(name))
        sys.exit()


def _get_field_type(items, logger):
    try:
        if items[0] in ES_CORE_TYPES:
            return items.pop(0)
    except TypeError as error:
        logger.warning(error)
    return 'string'


def _get_analyzer(analyzer):
    if analyzer == 'no':
        return 'index', 'no'
    elif analyzer == 'not_analyzed':
        return 'index', 'not_analyzed'
    else:
        return 'analyzer', analyzer


def _get_field_name(field_name, analyzer):
    if analyzer == 'not_indexed':
        return field_name
    return '.'.join([field_name, analyzer])


def generate_mapping(documents):
    properties = OrderedDict()
    logger = logging.getLogger(__name__)
    doc_type = _get_next_document(documents, 'doc_type', logger)
    fields = _get_next_document(documents, 'fields', logger)
    for field, items in fields.items():
        if not items:
            properties[field] = {'type': 'string', 'index': 'no'}
        else:
            type_ = _get_field_type(items, logger)
            properties[field] = {'type': type_}
            if len(items) == 1:
                properties[field].update(dict([_get_analyzer(items[0])]))
            elif len(items) > 1:
                properties[field]['type'] = 'multi_field'
                fields = OrderedDict()
                for item in items:
                    key, analyzer = _get_analyzer(item)
                    field_name = _get_field_name(field, analyzer)
                    fields[field_name] = {'type': type_}
                    fields[field_name].update({key: analyzer})
                properties[field]['fields'] = fields
    mapping = {doc_type: {"properties": properties}}
    print(json.dumps(mapping, indent=4, separators=(',', ': ')))


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
                        default=sys.stdin)
    args = parser.parse_args()
    generate_mapping(yaml_ordered_load(args.filename))


if __name__ == '__main__':
    main()