dollschasingmen
3/9/2017 - 1:17 AM

revisit visits

revisit visits

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.functions.udf
import java.time.Instant
import spark.implicits._

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

// functions
def optionWhen[T](m: String, n: String, o: Option[T]): Option[T] = if(m == n) o else None

// additional functions to do extractions by predicate
def eventPropertyForEvent[T: TypeTag: ClassTag](eventName: String)(prop: String)(event: Event): Option[T] = {
  if(event.name == eventName) event.event.propertyVal[T](prop) else None
}

def eventExists(eventName: String)(event: Event): Int = if(event.name == eventName) 1 else 0

def pageExists(page: String)(event: Event): Int = {
  val extractedPage = event.event.property("page").asString
  if(event.name == "page_view" && extractedPage.isDefined && extractedPage.get == page) 1 else 0
}

// case class
case class ProjectedFields(
                            name: String,
                            timestamp: Long,
                            visitId: String,
                            visitTimestamp: Long,
                            clientId: String,
                            clientTimestamp: Long,
                            userId: Option[Int],
                            assignmentId: String,
                            ipAddress: Option[String],
                            appName: Option[String],
                            clientPlatform: Option[String],
                            referralKey: Option[String],
                            preferredLanguage: Option[String],
                            signupReferral: Option[String],
                            signupUserId: Option[Int],
                            signupTimestamp: Option[Long],
                            purchasePageView: Int,
                            purchase: Int,
                            splashPageView: Int,
                            introductionPageView: Int,
                            signupPageView: Int,
                            adjustReferral: Option[String],
                            adjustNetwork: Option[String],
                            adjustCampaign: Option[String]
                          )

case class VisitFact(
                      visitId: String,
                      assignmentId: String,
                      ipAddress: Option[String],
                      visitTimestamp: Long,
                      clientId: String,
                      clientTimestamp: Long,
                      userId: Option[Long],
                      signupUserId: Option[Long],
                      signupReferral: Option[String],
                      signupTimestamp: Option[Long],
                      adjustReferral: Option[String],
                      adjustNetwork: Option[String],
                      adjustCampaign: Option[String],
                      appName: String,
                      clientPlatform: Option[String],
                      preferredLanguage: Option[String],
                      purchases: BigInt,
                      purchasePageViews: BigInt,
                      splashPageViews: BigInt,
                      introductionPageViews: BigInt,
                      signupPageViews: BigInt,
                      visitEndedAt: Long,
                      visitStartedAt: Long,
                      visitType: String,
                      isUnregistered: Int
                    )

// constants and vals
val IosAppNames = Set("Lumosity Mobile", "Lumosity Mobile iPad")
val BlackListEvents = Set("split_test_assigned", "bulk_email_unsubscribe")
val NewVisitThresholdInMillis = 150000

val GroupByKeys = List(
  "visitId", 
  "assignmentId",
  "ipAddress", 
  "visitTimestamp"
)

// spark job
//val events = sc.textFile("/mnt/lumos-data-dump-dev01/andy/test/events_small/")
val events = sc.textFile("/mnt/lumos-events/yyyy=2017/mm=03/dd=11")
val projectedFields = events.map(Event(_))
  .filter(event => {
    event.isValid && !BlackListEvents.contains(event.name) && IosAppNames.contains(event.appName) && event.visit.id.isDefined && event.visit.timestamp.isDefined && event.client.id.isDefined && event.client.timestamp.isDefined && event.assignmentId.isDefined
  })
  .map(event => ProjectedFields(
      event.name,
      event.header.rawTimestamp.get,
      event.visit.id.get,
      event.visit.rawTimestamp.get,
      event.client.id.get,
      event.client.rawTimestamp.get,
      event.userId,
      event.assignmentId.get,
      event.ipAddress,
      event.event.property("app_name").asString,
      event.event.property("client_platform").asString,
      event.event.property("referral_key").asString,
      event.preferredLanguage,
      eventPropertyForEvent[String]("sign_up")("referral_key")(event),
      eventPropertyForEvent[Int]("sign_up")("user_id")(event),
      optionWhen[Long]("sign_up", event.name, event.header.rawTimestamp),
      eventExists("purchase_page_view")(event),
      eventExists("purchase")(event),
      pageExists("Splash")(event),
      pageExists("Introduction")(event),
      pageExists("Signup")(event),
      eventPropertyForEvent[String]("adjust_attribution_received")("referral_key")(event),
      eventPropertyForEvent[String]("adjust_attribution_received")("network")(event),
      eventPropertyForEvent[String]("adjust_attribution_received")("campaign")(event)
  )).toDS


  val aggregatedVisitFacts = projectedFields
    .groupBy(GroupByKeys.head, GroupByKeys.tail: _*)
    .agg(
      first("clientId", ignoreNulls = true).alias("clientId"),
      first("clientTimestamp", ignoreNulls = true).alias("clientTimestamp"),
      first("userId", ignoreNulls = true).alias("userId"),
      first("signupUserId", ignoreNulls = true).alias("signupUserId"),
      first("signupReferral", ignoreNulls = true).alias("signupReferral"),
      first("signupTimestamp", ignoreNulls = true).alias("signupTimestamp"),
      first("adjustReferral", ignoreNulls = true).alias("adjustReferral"),
      first("adjustNetwork", ignoreNulls = true).alias("adjustNetwork"),
      first("adjustCampaign", ignoreNulls = true).alias("adjustCampaign"),
      first("appName", ignoreNulls = true).alias("appName"),
      first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
      first("preferredLanguage", ignoreNulls = true).alias("preferredLanguage"),
      sum("purchase").alias("purchases"),
      sum("purchasePageView").alias("purchasePageViews"),
      sum("splashPageView").alias("splashPageViews"),
      sum("introductionPageView").alias("introductionPageViews"),
      sum("signupPageView").alias("signupPageViews"),
      max("timestamp").alias("visitEndedAt"),
      min("timestamp").alias("visitStartedAt")
    )
    .withColumn("visitType", when($"visitTimestamp" - $"clientTimestamp" < NewVisitThresholdInMillis, "new").otherwise("returning"))
    .withColumn("isUnregistered", when($"userId".isNull || $"signupUserId".isNotNull, 1).otherwise(0)
    )
    .as[VisitFact].cache()

    //aggregatedVisitFacts.take(10)
    aggregatedVisitFacts.groupBy($"appName", $"clientPlatform").count().collect()

#group bys

for sure

  • visit_id / visit_timestamp
  • ll_source - web only, undefined for mobile -- also, derive visit_referral_key from it
  • ll_channel - web only, undefined for mobile
  • ip_address/country_code

maybe???

  • assignment_id

metrics

signup metrics

  • sign_up_user_id
  • signup referral key (if a signup happens, extract referral key from there)
  • client_id / client_timestamp
  • is facebook signup
  • signup timestamp

product metrics

  • purchases
  • plan_name
  • product_name
  • fx_rate
  • conversion_type
  • usd_purchase_amt

pages

  • purchase_page_view -- this is web only?
  • billing_page_view
  • signup_page_view

landing metrics

  • landing_page_id, landing_experience_id, landing_experience_set_id
  • ll_ad_id
  • creative_id

splits

  • app_name
  • client_platform
  • preferred_language
  • dma
  • subaffiliate_id

funnel marking splits

  • is_funnel_top - currently web only, undefined/open question for mobile
  • is_new_visit - currently web only, undefined/open question for mobile
  • is unregistered visitor - literally, visits starts off w/ no user_id

split test info

  • ste roll up -- maybe count by id, variant, mark by pre-reg, pre-conv
  • sta roll up?

other

  • events list?
import com.lumoslabs.events.common.Event
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.functions.udf
import java.time.Instant
import spark.implicits._

// UDFs

  val visitType = udf((clientTimestamp: Long, visitTimestamp: Long) => {
    if(Option(clientTimestamp).isEmpty || Option(visitTimestamp).isEmpty) {
      "unknown"
    } else if (((clientTimestamp - visitTimestamp) / 1000) < 150) {
      "new"
    } else {
      "returning"
    }
  })

val isUnregistered = udf((userId: Int, signupUserId: Int) => Option(userId).isEmpty || Option(signupUserId).isDefined)

// functions
object AdjustReferralExtract extends Serializable {
  val AdjustAttributionEvent = "adjust_attribution_received"
  private val CreativePattern = "\\A\\w+ .*\\((\\d+)\\)\\z".r
  private val IntegerPattern = "\\A(\\d+)\\z".r
  private val EmptyPattern = "\\s+".r

  def extract(name: String, network: Option[String], campaign: Option[String]) = {

    (name, network, campaign) match {
      case (AdjustAttributionEvent, Some("YouTubeInstalls"), _)       => Some("5217")
      case (AdjustAttributionEvent, _, Some(CreativePattern(id)))     => Some(campaign.get.split(" ")(0))
      case (AdjustAttributionEvent, _, Some(IntegerPattern(plainId))) => Some(plainId)
      case (AdjustAttributionEvent, _, Some(EmptyPattern(empty)))     => None
      case (AdjustAttributionEvent, _, _) => None
      case _ => None
    }
  } : Option[String]
}

def optionWhen[T](m: String, n: String, o: Option[T]): Option[T] = if(m == n) o else None
def eventExists(m: String, n: String): Int = if(m == n) 1 else 0
def pageExists(mPage: String, n: String, page: Option[String]): Int = if(n == "page_view" && page.isDefined && page.get == mPage) 1 else 0

// case class
case class ProjectedFields(
                            name: String,
                            timestamp: Long,
                            visitId: String,
                            visitTimestamp: Long,
                            clientId: String,
                            clientTimestamp: Long,
                            userId: Option[Int],
                            assignmentId: String,
                            ipAddress: Option[String],
                            appName: Option[String],
                            clientPlatform: Option[String],
                            referralKey: Option[String],
                            preferredLanguage: Option[String],
                            signupReferral: Option[String],
                            signupUserId: Option[Int],
                            signupTimestamp: Option[Long],
                            purchasePageView: Int,
                            purchase: Int,
                            splashPageView: Int,
                            introductionPageView: Int,
                            signupPageView: Int,
                            adjustReferral: Option[String]
                          )

case class VisitFact(
                      visitId: String,
                      assignmentId: String,
                      ipAddress: Option[String],
                      visitTimestamp: Long,
                      clientId: String,
                      clientTimestamp: Long,
                      userId: Option[Long],
                      signupUserId: Option[Long],
                      signupReferral: Option[String],
                      signupTimestamp: Option[Long],
                      adjustReferral: Option[String],
                      appName: String,
                      clientPlatform: Option[String],
                      preferredLanguage: Option[String],
                      purchases: Option[Long],
                      purchasePageViews: Option[Long],
                      visitEndedAt: Long,
                      visitStartedAt: Long,
                      visitType: String,
                      isUnregistered: Option[Boolean]
                    )

// constants and vals
val IosAppNames = Set("Lumosity Mobile", "Lumosity Mobile iPad")
val BlackListEvents = Set("split_test_assigned", "bulk_email_unsubscribe")

val GroupByKeys = List(
  "visitId", 
  "assignmentId",
  "ipAddress", 
  "visitTimestamp"
)

// spark job
//val events = sc.textFile("/mnt/lumos-data-dump-dev01/andy/test/events_small/")
val events = sc.textFile("/mnt/lumos-events/yyyy=2017/mm=03/dd=11")
val projectedFields = events.map(Event(_))
  .filter(event => {
    event.isValid && !BlackListEvents.contains(event.name) && IosAppNames.contains(event.appName) && event.visit.id.isDefined && event.visit.timestamp.isDefined && event.client.id.isDefined && event.client.timestamp.isDefined && event.assignmentId.isDefined
  })
  .map(event => ProjectedFields(
      event.name,
      event.header.rawTimestamp.get,
      event.visit.id.get,
      event.visit.rawTimestamp.get,
      event.client.id.get,
      event.client.rawTimestamp.get,
      event.userId,
      event.assignmentId.get,
      event.ipAddress,
      event.event.property("app_name").asString,
      event.event.property("client_platform").asString,
      event.event.property("referral_key").asString,
      event.preferredLanguage,
      optionWhen[String]("sign_up", event.name, event.event.property("referral_key").asString),
      optionWhen[Int]("sign_up", event.name, event.userId),
      optionWhen[Long]("sign_up", event.name, event.header.rawTimestamp),
      eventExists("purchase_page_view", event.name),
      eventExists("purchase", event.name),
      pageExists("Splash", event.name, event.event.property("page").asString),
      pageExists("Introduction", event.name, event.event.property("page").asString),
      pageExists("Signup", event.name, event.event.property("page").asString),
      AdjustReferralExtract.extract(event.name, event.event.property("network").asString, event.event.property("campaign").asString)
  )).toDS

  projectedFields.printSchema()

  projectedFields
    .groupBy(GroupByKeys.head, GroupByKeys.tail: _*)
    .agg(
      first("clientId", ignoreNulls = true).alias("clientId"),
      first("clientTimestamp", ignoreNulls = true).alias("clientTimestamp"),
      first("userId", ignoreNulls = true).alias("userId"),
      first("signupUserId", ignoreNulls = true).alias("signupUserId"),
      first("signupReferral", ignoreNulls = true).alias("signupReferral"),
      first("signupTimestamp", ignoreNulls = true).alias("signupTimestamp"),
      first("adjustReferral", ignoreNulls = true).alias("adjustReferral"),
      first("appName", ignoreNulls = true).alias("appName"),
      first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
      first("preferredLanguage", ignoreNulls = true).alias("preferredLanguage"),
      sum("purchase").alias("purchases"),
      sum("purchasePageView").alias("purchasePageViews"),
      max("timestamp").alias("visitEndedAt"),
      min("timestamp").alias("visitStartedAt")
    )
    .withColumn("visitType", visitType($"clientTimestamp", $"visitTimestamp"))
    .withColumn("isUnregistered", isUnregistered($"userId", $"signupUserId"))
    .take(20)

CREATE TABLE

CREATE TABLE views.web_visit_facts
(
  visit_id VARCHAR(96) ENCODE LZO,
  assignment_id VARCHAR(64) ENCODE LZO,
  ip_address VARCHAR(96) ENCODE LZO,
  country_code VARCHAR(2) ENCODE LZO,
  dma INTEGER ENCODE DELTA,
  visit_timestamp BIGINT ENCODE DELTA32K,
  ll_source VARCHAR(256) ENCODE LZO,
  ll_channel VARCHAR(256) ENCODE LZO,
  visit_referral_key VARCHAR(64) ENCODE LZO,
  client_id VARCHAR(64) ENCODE LZO,
  client_timestamp BIGINT ENCODE DELTA32K,
  user_id INTEGER ENCODE DELTA,
  signup_user_id INTEGER ENCODE DELTA,
  signup_referral_key VARCHAR(96) ENCODE LZO,
  signup_timestamp BIGINT ENCODE DELTA32K,
  ll_ad_id BIGINT ENCODE DELTA32K,
  creative_id BIGINT ENCODE DELTA32K,
  subaffiliate_id VARCHAR(256) ENCODE LZO,
  is_facebook_signup VARCHAR(16) ENCODE LZO,
  app_name VARCHAR(64) ENCODE LZO,
  client_platform VARCHAR(64) ENCODE LZO,
  preferred_language VARCHAR(2) ENCODE LZO,
  landing_page_id INTEGER ENCODE DELTA,
  landing_experience_id INTEGER ENCODE DELTA,
  landing_experience_set_id INTEGER ENCODE DELTA,
  purchases INTEGER ENCODE DELTA,
  purchase_page_views INTEGER ENCODE DELTA,
  homepage_views INTEGER ENCODE DELTA,
  billing_page_views INTEGER ENCODE DELTA,
  signup_page_views INTEGER ENCODE DELTA,
  visit_ended_timestamp BIGINT ENCODE DELTA32K,
  visit_started_timestamp BIGINT ENCODE DELTA32K,
  visit_type VARCHAR(16) ENCODE LZO,
  is_unregistered BOOLEAN,
  is_funnel_top BOOLEAN,
  is_new_visitor BOOLEAN
)
DISTSTYLE KEY
DISTKEY(user_id)
SORTKEY(visit_timestamp);

grant select on table views.web_visit_facts to GROUP read_only;
CREATE TABLE views.ios_visit_facts
(
  visit_id VARCHAR(96) ENCODE LZO,
  assignment_id VARCHAR(64) ENCODE LZO,
  ip_address VARCHAR(96) ENCODE LZO,
  country_code VARCHAR(2) ENCODE LZO,
  dma INTEGER ENCODE DELTA,
  visit_timestamp BIGINT ENCODE DELTA32K,
  client_id VARCHAR(64) ENCODE LZO,
  client_timestamp BIGINT ENCODE DELTA32K,
  user_id INTEGER ENCODE DELTA,
  signup_user_id INTEGER ENCODE DELTA,
  signup_referral_key VARCHAR(96) ENCODE LZO,
  signup_timestamp BIGINT ENCODE DELTA32K,
  adjust_referral VARCHAR(96) ENCODE LZO,
  adjust_network VARCHAR(256) ENCODE LZO,
  adjust_campaign VARCHAR(256) ENCODE LZO,
  app_name VARCHAR(64) ENCODE LZO,
  client_platform VARCHAR(64) ENCODE LZO,
  preferred_language VARCHAR(2) ENCODE LZO,
  purchases INTEGER ENCODE DELTA,
  purchase_page_views INTEGER ENCODE DELTA,
  splash_page_views INTEGER ENCODE DELTA,
  introduction_page_views INTEGER ENCODE DELTA,
  signup_page_views INTEGER ENCODE DELTA,
  visit_ended_timestamp BIGINT ENCODE DELTA32K,
  visit_started_timestamp BIGINT ENCODE DELTA32K,
  visit_type VARCHAR(16) ENCODE LZO,
  is_unregistered BOOLEAN
)
DISTSTYLE KEY
DISTKEY(user_id)
SORTKEY(visit_timestamp);

grant select on table views.ios_visit_facts to GROUP read_only;

COPY

COPY views.web_visit_facts
FROM 's3://lumos-data/web_visit_facts/tsv/yyyy=2017/mm=05/dd=11'
CREDENTIALS 'aws_access_key_id= ;aws_secret_access_key= '
GZIP DELIMITER '\t' EMPTYASNULL FILLRECORD NULL 'null' TRUNCATECOLUMNS
COPY views.ios_visit_facts
FROM 's3://lumos-data/ios_visit_facts/tsv/yyyy=2017/mm=05/dd=11'
CREDENTIALS 'aws_access_key_id= ;aws_secret_access_key= '
GZIP DELIMITER '\t' EMPTYASNULL FILLRECORD NULL 'null' TRUNCATECOLUMNS