shota-fukawa
11/21/2017 - 8:40 AM

KeyValue形式のCollectionによるFilterのやり方

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


def run(argv=None):
    parser = argparse.ArgumentParser()
    _, pipeline_args = parser.parse_known_args(argv)
    pipeline = beam.Pipeline(options=PipelineOptions())

    base_list = [
        ("k001", "v001"),
        ("k001", "v002"),
        ("k001", "v003"),
        ("k002", "v011"),
        ("k002", "v012")
    ]

    filter_list = [
        ("k001", "v001"),
        ("k001", "v003"),
        ("k002", "v012")
    ]

    base = pipeline | "base" >> beam.Create(base_list)
    fil = pipeline | "filter" >> beam.Create(filter_list)

    group_coll = {"base": base, "filter": fil} | beam.CoGroupByKey()

    # 中身の確認
    group_coll | beam.Map(lambda (key, values): print(key, values))
    _ = """
    k001 {'filter': ['v001', 'v003'], 'base': ['v001', 'v002', 'v003']}
    k002 {'filter': ['v012'], 'base': ['v011', 'v012']}
    """

    # Filterのlambda変数に()をつけるとエラーになる
    (group_coll
     | beam.Map(lambda (key, values): (key, (values["base"] | beam.Filter(lambda b, f: b in f, values["filter"]))))
     | beam.Map(lambda (key, values): print(key, values))
     )
    _ = """
    k001 ['v001', 'v003']
    k002 ['v012']
    """

    result = pipeline.run()
    result.wait_until_finish()

if __name__ == '__main__':
    run()