WIP MongoDB Apache Beam Sink for Python
__all__ = ['WriteToMongo']
import json
from pymongo import MongoClient
from apache_beam.transforms import PTransform
from apache_beam.io import iobase
class _MongoSink(iobase.Sink):
"""A :class:`~apache_beam.io.iobase.Sink`."""
def __init__(self, connection_string, database, collection):
self._connection_string = connection_string
self._database = database
self._collection = collection
self._client = None
@property
def client(self):
if self._client:
return self._client
self._client = MongoClient(self._connection_string)
return self._client
def initialize_write(self):
return self.client[self._database][self._collection]
def open_writer(self, init_result, uid):
return _WriteToMongo(init_result, uid)
def finalize_write(self, init_result, writer_results):
print '>>>>>>>>>>>>>>>>>>> finalize_write'
class _WriteToMongo(iobase.Writer):
"""A :class:`~apache_beam.io.iobase.Writer` for writing to MongoDB."""
def __init__(self, init_result, uid):
self._collection = init_result
self._uid = uid
def write(self, document):
document = json.loads(document)
self._collection.insert_one(document)
def close(self):
print '>>>>>>>>>>>>>>>>>>>> close'
class WriteToMongo(PTransform):
"""A :class:`~apache_beam.transforms.PTransform` wrapper for _MongoSink."""
def __init__(self, connection_string, database, collection):
super(WriteToMongo, self).__init__()
self._connection_string = connection_string
self._database = database
self._collection = collection
self._sink = _MongoSink(
self._connection_string,
self._database,
self._collection
)
def expand(self, pcoll):
return pcoll | iobase.Write(self._sink)