Accepts a YAML file with field names, types & analyzers and generates an Elasticsearch JSON mapping file
# -*- coding: utf-8 -*-
"""
Simple ElasticSearch Field Mapping Template Generator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Example YAML file
-----------------
::
sandbox
---
form_number:
- lower_alphanum
- lower_alphanum_prefix
form_name:
- lower_keyword
- snowball
revision_date:
- date
revision_number:
revision_replaces:
revision_replaced_by:
revision_ready_for_publication:
- boolean
* This first line of the YAML file must be the Elasticsearch database doc-type
"""
import argparse
import json
import logging
import sys
from collections import OrderedDict
try:
import yaml
except ImportError:
print('Requires pyymal. Try pip --install pyyaml')
ES_CORE_TYPES = ['string', 'boolean', 'integer', 'short', 'long', 'float',
'double', 'byte', 'binary', 'token_count', 'date']
# -----------------------------------------------------------------------------
# Load YAML into an OrderedDict
# reference: http://stackoverflow.com/questions/5121931
class OrderedLoader(yaml.SafeLoader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return OrderedDict(loader.construct_pairs(node))
def yaml_ordered_load(stream):
default_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG
OrderedLoader.add_constructor(default_mapping_tag, construct_mapping)
return yaml.load_all(stream, OrderedLoader)
# ----------------------------------------------------------------------------
def _get_next_document(documents, name, logger):
logger = logging.getLogger(__name__)
try:
return next(documents)
except StopIteration:
logger.warning('No {} found the YAML file.'.format(name))
sys.exit()
def _get_field_type(items, logger):
try:
if items[0] in ES_CORE_TYPES:
return items.pop(0)
except TypeError as error:
logger.warning(error)
return 'string'
def _get_analyzer(analyzer):
if analyzer == 'no':
return 'index', 'no'
elif analyzer == 'not_analyzed':
return 'index', 'not_analyzed'
else:
return 'analyzer', analyzer
def _get_field_name(field_name, analyzer):
if analyzer == 'not_indexed':
return field_name
return '.'.join([field_name, analyzer])
def generate_mapping(documents):
properties = OrderedDict()
logger = logging.getLogger(__name__)
doc_type = _get_next_document(documents, 'doc_type', logger)
fields = _get_next_document(documents, 'fields', logger)
for field, items in fields.items():
if not items:
properties[field] = {'type': 'string', 'index': 'no'}
else:
type_ = _get_field_type(items, logger)
properties[field] = {'type': type_}
if len(items) == 1:
properties[field].update(dict([_get_analyzer(items[0])]))
elif len(items) > 1:
properties[field]['type'] = 'multi_field'
fields = OrderedDict()
for item in items:
key, analyzer = _get_analyzer(item)
field_name = _get_field_name(field, analyzer)
fields[field_name] = {'type': type_}
fields[field_name].update({key: analyzer})
properties[field]['fields'] = fields
mapping = {doc_type: {"properties": properties}}
print(json.dumps(mapping, indent=4, separators=(',', ': ')))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
default=sys.stdin)
args = parser.parse_args()
generate_mapping(yaml_ordered_load(args.filename))
if __name__ == '__main__':
main()