# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run(argv=None):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
pipeline = beam.Pipeline(options=PipelineOptions())
base_list = [
("k001", "v001"),
("k001", "v002"),
("k001", "v003"),
("k002", "v011"),
("k002", "v012")
]
filter_list = [
("k001", "v001"),
("k001", "v003"),
("k002", "v012")
]
base = pipeline | "base" >> beam.Create(base_list)
fil = pipeline | "filter" >> beam.Create(filter_list)
group_coll = {"base": base, "filter": fil} | beam.CoGroupByKey()
# 中身の確認
group_coll | beam.Map(lambda (key, values): print(key, values))
_ = """
k001 {'filter': ['v001', 'v003'], 'base': ['v001', 'v002', 'v003']}
k002 {'filter': ['v012'], 'base': ['v011', 'v012']}
"""
# Filterのlambda変数に()をつけるとエラーになる
(group_coll
| beam.Map(lambda (key, values): (key, (values["base"] | beam.Filter(lambda b, f: b in f, values["filter"]))))
| beam.Map(lambda (key, values): print(key, values))
)
_ = """
k001 ['v001', 'v003']
k002 ['v012']
"""
result = pipeline.run()
result.wait_until_finish()
if __name__ == '__main__':
run()