sandboxws
3/9/2018 - 1:45 AM

WIP MongoDB Apache Beam Sink for Python

WIP MongoDB Apache Beam Sink for Python

__all__ = ['WriteToMongo']

import json

from pymongo import MongoClient
from apache_beam.transforms import PTransform
from apache_beam.io import iobase

class _MongoSink(iobase.Sink):
    """A :class:`~apache_beam.io.iobase.Sink`."""

    def __init__(self, connection_string, database, collection):
        self._connection_string = connection_string
        self._database = database
        self._collection = collection
        self._client = None

    @property
    def client(self):
        if self._client:
            return self._client

        self._client = MongoClient(self._connection_string)
        return self._client

    def initialize_write(self):
        return self.client[self._database][self._collection]

    def open_writer(self, init_result, uid):
        return _WriteToMongo(init_result, uid)

    def finalize_write(self, init_result, writer_results):
        print '>>>>>>>>>>>>>>>>>>> finalize_write'

class _WriteToMongo(iobase.Writer):
    """A :class:`~apache_beam.io.iobase.Writer` for writing to MongoDB."""

    def __init__(self, init_result, uid):
        self._collection = init_result
        self._uid = uid

    def write(self, document):
        document = json.loads(document)
        self._collection.insert_one(document)

    def close(self):
        print '>>>>>>>>>>>>>>>>>>>> close'

class WriteToMongo(PTransform):
    """A :class:`~apache_beam.transforms.PTransform` wrapper for _MongoSink."""

    def __init__(self, connection_string, database, collection):
        super(WriteToMongo, self).__init__()

        self._connection_string = connection_string
        self._database = database
        self._collection = collection
        self._sink = _MongoSink(
            self._connection_string,
            self._database,
            self._collection
        )

    def expand(self, pcoll):
        return pcoll | iobase.Write(self._sink)