iniyanp
2/15/2017 - 1:04 AM

Learning_From_Gazelle

Learning_From_Gazelle

Req: we are building a service gazelle (API) fully assocaited with user context.
Dealing with Database as well.

Gazelle operations:
1. getFollow,
2. createFollow.

class Application and it should have one transformation which takes gazelle operation and returns 
Kleisli[Task,UserCtx,A] which means it returns a function which takes Task[UserCtx] and 
gives Task[A] with respect to that operation.

Because Each and every operation depends on UserCtx.

package object gazelle {
  type UsrCtxK[A] = Kleisli[Task, UsrCtx, A]
  type GazelleService = GazelleOp ~> UsrCtxK
  type DbOpF[A] = Free[DbOp, A]

  implicit def logOps(log: Logger): LogOps = LogOps(log)
}

final case class UsrCtx(
    user: User,
    deviceId: DeviceId,
    transactionId: TransactionId
)

GazelleOp is an Algorithm.

When we have interpreter inside an object, we can get the interpreter result using this:

val p = app.gzSvc(CreateFollow(follow))

p is UsrCtxK. //Kleisli[Task, UserCtx, A]

When we run the above code with UserCtx, will get Task[A]
val p1 = p.run(ctx)
p1 is Task[CreateFollowResponse]



Ultimately, we need gazelleOp ~> A. But we won't get unless we have userCtx.
So it is better to have interpreter like  gazelleOp ~> Kleisli[Task, UserCtx, A]. Amazing right :) !!!


so liftKleisli is used to lift the computation into  Kleisli.

Example:
val p:Task[Response] = ???
val q = p.liftKleisli[UsrCtx]
q //Kleisli[Task, UsrCtx, Response]


package gazelle

import scalaz.~>
import doobie.imports._
import doobie.hi.connection._
import DbOp._
import java.util.UUID
import java.time.Instant
import scala.concurrent.duration._
import peddler.{SeriesId, TitleId}
import gazelle.scheduler.Timestamp

object MySqlInterpreter extends (DbOp ~> ConnectionIO) {

  implicit val instantMeta: Meta[Instant] =
    Meta[Long].xmap(Instant.ofEpochMilli, _.toEpochMilli)

  implicit val uuidMeta: Meta[UUID] =
    Meta[String].nxmap(UUID.fromString, _.toString)

  implicit val followTypeMeta: Meta[FollowType] =
    Meta[String].nxmap(FollowType.fromString, _.asString)

  implicit val extraTimeMeta: Meta[ExtraTime] =
    Meta[Int].xmap(x => ExtraTime(x minutes), _.duration.toMinutes.toInt)

  implicit val followComposite: Composite[Follow] =
    Composite[
      (FollowId,
       FollowType,
       String,
       Option[SeriesId],
       Option[TitleId],
       Option[ExtraTime],
       Option[NewEpisodesOnly],
       IsProtected)].xmap({
      case (fid, FollowType.Title, _, sId, Some(tid), et, _, ip) =>
        TitleFollow(fid, tid, sId, et, ip)
      case (fid, FollowType.Series, _, Some(sid), _, et, Some(neo), ip) =>
        SeriesFollow(fid, sid, neo, et, ip)
      case _ =>
        throw new IllegalArgumentException(
          "NewEpisodesOnly must be defined for SeriesFollow")
    }, {
      case TitleFollow(fid, tId, sId, et, ip) =>
        (fid,
         FollowType.Title: FollowType,
         tId.asString,
         sId,
         Some(tId),
         et,
         None,
         ip)
      case SeriesFollow(fid, sId, neo, extraTime, isProtected) =>
        (fid,
         FollowType.Series: FollowType,
         sId.asString,
         Some(sId),
         None,
         extraTime,
         Some(neo),
         isProtected)
    })

  implicit val dvrStatusMeta: Meta[DvrStatus] =
    Meta[String].nxmap(DvrStatus.fromString, _.asString)

  def apply[A](op: DbOp[A]): ConnectionIO[A] = op match {
    case PutAccount(acctId, provisionedSpaceGb, streamLimit) =>
      putAccount(acctId, provisionedSpaceGb, streamLimit)
    case GetAccount(acctId) => getAccount(acctId)
    case CreateFollow(user, f) =>
      for {
        followId <- delay(UUID.randomUUID)
        created <- delay(Instant.now())
        res <- createFollow(followId, created, user, f)
      } yield res
    case CreateRecordings(recs) => ???
    case CreateRecording(r) => createRecording(r)
    case DiskSpaceConflict(acctId) => ???
    case GetFollow(id) => getFollow(id)
    case GetSeriesFollow(user, seriesId) => ???
    case UpdateFollow(id, extraTime, isProtected, newEpisodesOnly) =>
      updateFollow(id, extraTime, isProtected, newEpisodesOnly)
    case GetTitleFollow(user, titleId) => ???
    case DeleteFollow(id) => deleteFollow(id)
    case GetAcctFutRecordings(acct, now) => getAcctFutRecordings(acct, now)
    case GetAcctRecordings(acctId, titleId) =>
      getAcctRecordings(acctId, titleId)
    case GetRecordingsBySeriesOrTitle(sot) => ???
    case GetAcctUsedSpace(acctId) => ???
    case GetEstimatedFutureSpace(acctId) => ???
    case GetAcctTotalSpace(acctId) => ???
    case GetRecordings(user, titles) => ???
    case RemoveRecording(rId) => deleteRecording(rId)
    case UpdateRecording(rec) => ???
    case PutDvr(dvr) => putDvr(dvr)
    case GetDvr(acctId) => getDvr(acctId)
    case DeleteDvr(dvrId) => deleteDvr(dvrId)
  }

  private val selectRecordingFragment =
    """
      SELECT
        r.id,
        r.acct_id,
        ro.profile_id,
        r.title_id,
        r.airing_id,
        r.channel_id,
        true,
        r.air_start_time,
        r.air_end_time,
        ro.extra_minutes,
        ro.is_protected
      FROM recording r
      INNER JOIN user_rec_opts ro
      ON r.id = ro.recording_id
    """

  def putAccount(
      id: AccountId,
      provisionedSpaceGb: ProvisionedSpaceGb,
      streamLimit: StreamLimit): ConnectionIO[Unit] = {
    val insertOp =
      sql"""
           INSERT INTO account (
             id,
             provisioned_space_gb,
             stream_limit
           ) VALUES (
             $id,
             $provisionedSpaceGb,
             $streamLimit
           ) ON DUPLICATE KEY UPDATE
             provisioned_space_gb=$provisionedSpaceGb,
             stream_limit=$streamLimit
           ;
         """
    insertOp.update.run.map(_ => ())
  }

  def getAccount(id: AccountId): ConnectionIO[Option[Account]] = {
    val selectOp = sql"""
      SELECT
        id,
        stream_limit,
        provisioned_space_gb,
        schedule_version
      FROM account
      WHERE id=$id;
      """
    selectOp.query[Account].option
  }

  def createRecording(r: Recording): ConnectionIO[Unit] = {
    for {
      _ <- createAccountRecording(r)
      _ <- createUserRecOpts(r)
    } yield ()
  }

  def createUserRecOpts(r: Recording): ConnectionIO[Int] = {
    val insertOp =
      sql"""
           INSERT INTO user_rec_opts (
             recording_id,
             profile_id,
             extra_minutes,
             is_protected,
             keep_for_user
           ) VALUES (
             ${r.id},
             ${r.user.profileId},
             ${r.bonusTime},
             ${r.isProtected},
             true
           );
         """
    insertOp.update.run
  }

  def createAccountRecording(r: Recording): ConnectionIO[Int] = {
    //TODO: save real values for channel_version_id, series_id,
    // schd_start_time, schd_end_time
    val insertOp =
      sql"""
           INSERT INTO recording (
               id,
               acct_id,
               channel_id,
               channel_version_id,
               airing_id,
               title_id,
               series_id,
               air_start_time,
               air_end_time,
               schd_start_time,
               schd_end_time
             )
             VALUES (
                ${r.id},
                ${r.user.accountId},
                ${r.channelId},
                'channelVersionId_Stub',
                ${r.airingId},
                ${r.titleId},
                'seriesId_Stub',
                ${r.interval.start},
                ${r.interval.end},
                ${r.interval.start},
                ${r.endTime}
             );
         """
    insertOp.update.run
  }

  def deleteRecording(rId: RecordingId): ConnectionIO[Unit] = {
    //Note that this deletes the recording and ALL user associations
    val updateOp = sql"""
      DELETE FROM recording
      WHERE id=${rId}
    """

    updateOp.update.run.map(_ => ())
  }

  def getAcctFutRecordings(
      accountId: AccountId,
      now: Timestamp): ConnectionIO[Set[Recording]] = {
    val query =
      Query[(AccountId, Timestamp), Recording](s"""
       ${selectRecordingFragment}
       WHERE r.acct_id = ? and r.schd_end_time > ?;
      """).toQuery0((accountId, now))

    query.to[Set]
  }

  def getAcctRecordings(
      acctId: AccountId,
      titleId: TitleId): ConnectionIO[Set[Recording]] = {
    val query = Query[(AccountId, TitleId), Recording](s"""
       ${selectRecordingFragment}
       WHERE r.acct_id = ? and r.title_id = ?;
      """).toQuery0((acctId, titleId))

    query.to[Set]
  }

  def createFollow(
      id: UUID,
      createdAt: Instant,
      user: User,
      req: CreateFollowRequest): ConnectionIO[FollowId] = {
    val (followType, programId, titleOpt, seriesOpt, neoOpt) =
      req.detail match {
        case tf: TitleFollowRequest =>
          (FollowType.Title,
           tf.titleId.asString,
           Some(tf.titleId),
           tf.seriesId,
           None)
        case sf: SeriesFollowRequest =>
          (FollowType.Series,
           sf.seriesId.asString,
           None,
           Some(sf.seriesId),
           Some(sf.newEpisodesOnly))
      }

    val insertOp = sql"""
    INSERT INTO follow (
      id,
      acct_id,
      profile_id,
      follow_type,
      program_id,
      series_id,
      title_id,
      new_episodes_only,
      extra_minutes,
      is_protected,
      created
    )
    VALUES (
      ${id},
      ${user.accountId},
      ${user.profileId},
      ${followType},
      ${programId},
      ${seriesOpt},
      ${titleOpt},
      ${neoOpt},
      ${req.extraTime},
      ${req.isProtected},
      ${createdAt}
      );
    """
    insertOp.update.run.map(_ => FollowId(id))
  }

  def getFollow(id: FollowId): ConnectionIO[Option[Follow]] = {
    val selectOp = sql"""
      SELECT
        id,
        follow_type,
        program_id,
        series_id,
        title_id,
        extra_minutes,
        new_episodes_only,
        is_protected
      FROM follow
      WHERE id = $id
      """
    selectOp.query[Follow].option
  }

  def updateFollow(
      id: FollowId,
      extraTime: ExtraTime,
      isProtected: IsProtected,
      newEpisodesOnly: NewEpisodesOnly): ConnectionIO[Unit] = ???

  def deleteFollow(id: FollowId): ConnectionIO[Unit] = ???

  def putDvr(d: Dvr): ConnectionIO[Unit] = {
    val insertOp = sql"""
    INSERT INTO dvr (
      id,
      acct_id,
      host_name,
      wan_ip,
      wan_port,
      drive_used_bytes,
      status
    )
    VALUES (
      ${d.id},
      ${d.accountId},
      ${d.hostName},
      INET_ATON(${d.wanIp}),
      ${d.wanPort},
      ${d.usedBytes},
      ${d.status}
      ) ON DUPLICATE KEY UPDATE
      host_name = ${d.hostName},
      wan_ip = INET_ATON(${d.wanIp}),
      wan_port = ${d.wanPort},
      drive_used_bytes = ${d.usedBytes},
      status = ${d.status}
      ;
    """
    insertOp.update.run.map(_ => ())
  }

  def getDvr(acctId: AccountId): ConnectionIO[Option[Dvr]] = {
    val selectOp = sql"""
      SELECT
        id,
        acct_id,
        host_name,
        INET_NTOA(wan_ip),
        wan_port,
        drive_used_bytes,
        status
      FROM dvr
      WHERE acct_id = $acctId
      """
    selectOp.query[Dvr].option
  }

  def deleteDvr(id: DeviceId): ConnectionIO[Unit] = {
    val deleteOp = sql"""
      DELETE FROM dvr
      WHERE id=${id}
    """
    deleteOp.update.run.map(_ => ())
  }

}