shota-fukawa
11/21/2017 - 4:18 AM

forループの挙動について

Filterの挙動確認

ソース

def run(args=None):
    pipeline = beam.Pipeline(options=PipelineOptions())
    kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
    keys = ["k001", "k002"]
    for key in keys:
        (kv
         | "Filter: %s" % key >> beam.Filter(lambda (k, v): k == key)
         | "Map: %s" % key >> beam.Map(lambda (k, v): print(v))
         )
    
    result = pipeline.run()
    result.wait_until_finish()

if __name__ == '__main__':
    run()

出力結果

[INFO] 2017-11-21 15:35:57 root Running pipeline with DirectRunner.
['v011', 'v012', 'v013']
['v011', 'v012', 'v013']
[INFO] 2017-11-21 15:35:57 root Application Finish

["v001", "v002", "v003"] の行と、 ['v011', 'v012', 'v013'] の行が出るのかと思いきや ['v011', 'v012', 'v013'] が2回出力された。keysの一番最後の値が使われる?

keysの順番を変えてみる

ソース

pipeline = beam.Pipeline(options=PipelineOptions())
kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
keys = ["k002", "key001"]
for key in keys:
    (kv
     | "Filter: %s" % key >> beam.Filter(lambda (k, v): k == key)
     | "Map: %s" % key >> beam.Map(lambda (k, v): print(v))
     )

result = pipeline.run()
result.wait_until_finish()

出力結果

[INFO] 2017-11-21 15:48:03 root Running pipeline with DirectRunner.
['v001', 'v002', 'v003']
['v001', 'v002', 'v003']
[INFO] 2017-11-21 15:48:03 root Application Finish

keysの最後の値でFilterがかかっているもよう。

べた書きの処理をTransformクラスにし、keyを引数で渡してみる

ソース

from apache_beam.typehints import with_input_types, with_output_types, Tuple, List
@with_input_types(Tuple[str, List[str]])
@with_output_types(None)
class SampleTransform(beam.PTransform):
    def __init__(self, key):
        self.key = key

    def expand(self, pcoll):
        return (pcoll
                | beam.Filter(lambda (k, v): k == self.key)
                | beam.Map(lambda (k, v): print(v))
                )

pipeline = beam.Pipeline(options=PipelineOptions())
kv = pipeline | beam.Create([("k001", ["v001", "v002", "v003"]), ("k002", ["v011", "v012", "v013"])])
keys = ["k001", "k002"]

for key in keys:
    kv | key >> SampleTransform(key)

result = pipeline.run()
result.wait_until_finish()

出力結果

[INFO] 2017-11-21 15:54:06 root Running pipeline with DirectRunner.
['v001', 'v002', 'v003']
['v011', 'v012', 'v013']
[INFO] 2017-11-21 15:54:06 root Application Finish

できた。Transformクラスを作ってあげれば解決されるみたい。