def run(args=None):
pipeline = beam.Pipeline(options=PipelineOptions())
kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
keys = ["k001", "k002"]
for key in keys:
(kv
| "Filter: %s" % key >> beam.Filter(lambda (k, v): k == key)
| "Map: %s" % key >> beam.Map(lambda (k, v): print(v))
)
result = pipeline.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
[INFO] 2017-11-21 15:35:57 root Running pipeline with DirectRunner.
['v011', 'v012', 'v013']
['v011', 'v012', 'v013']
[INFO] 2017-11-21 15:35:57 root Application Finish
["v001", "v002", "v003"]
の行と、 ['v011', 'v012', 'v013']
の行が出るのかと思いきや
['v011', 'v012', 'v013']
が2回出力された。keysの一番最後の値が使われる?
pipeline = beam.Pipeline(options=PipelineOptions())
kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
keys = ["k002", "key001"]
for key in keys:
(kv
| "Filter: %s" % key >> beam.Filter(lambda (k, v): k == key)
| "Map: %s" % key >> beam.Map(lambda (k, v): print(v))
)
result = pipeline.run()
result.wait_until_finish()
[INFO] 2017-11-21 15:48:03 root Running pipeline with DirectRunner.
['v001', 'v002', 'v003']
['v001', 'v002', 'v003']
[INFO] 2017-11-21 15:48:03 root Application Finish
keysの最後の値でFilterがかかっているもよう。
from apache_beam.typehints import with_input_types, with_output_types, Tuple, List
@with_input_types(Tuple[str, List[str]])
@with_output_types(None)
class SampleTransform(beam.PTransform):
def __init__(self, key):
self.key = key
def expand(self, pcoll):
return (pcoll
| beam.Filter(lambda (k, v): k == self.key)
| beam.Map(lambda (k, v): print(v))
)
pipeline = beam.Pipeline(options=PipelineOptions())
kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
keys = ["k001", "k002"]
for key in keys:
kv | key >> SampleTransform(key)
result = pipeline.run()
result.wait_until_finish()
[INFO] 2017-11-21 15:54:06 root Running pipeline with DirectRunner.
['v001', 'v002', 'v003']
['v011', 'v012', 'v013']
[INFO] 2017-11-21 15:54:06 root Application Finish
できた。Transformクラスを作ってあげれば解決されるみたい。