mwebler
7/20/2018 - 4:51 AM

Apache Beam Python: Google Datastore Query with composite filter

Apache Beam Python: Google Datastore Query with composite filter

import apache_beam as beam

from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from google.cloud.proto.datastore.v1 import query_pb2

from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter


p = beam.Pipeline(options=pipeline_options)

# Create a query and filter by kind
query = query_pb2.Query()
query.kind.add().name = 'EntityKind'

# Create a query for entries with timestamp >= 2018-01-01 (1514764800)
timestampStartQuery = query_pb2.Query()
datastore_helper.set_property_filter(
    timestampStartQuery.filter, 'timestamp', query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL, 1514764800)

# Create a query for entries with timestamp <= 2018-01-31 (1517443199)
timestampEndQuery = query_pb2.Query()
datastore_helper.set_property_filter(
    timestampStartQuery.filter, 'timestamp', query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL, 1517443199)

# Combine both queries in a local AND to get entries between 2018-01-01 AND 2018-01-31
datastore_helper.set_composite_filter(
    familyLogQuery.filter, query_pb2.CompositeFilter.AND,
    timestampStartQuery,
    timestampEndQuery)

result = p | ReadFromDatastore(project=PROJECT_ID, query=query)