Learning_From_Gazelle
Req: we are building a service gazelle (API) fully assocaited with user context.
Dealing with Database as well.
Gazelle operations:
1. getFollow,
2. createFollow.
class Application and it should have one transformation which takes gazelle operation and returns
Kleisli[Task,UserCtx,A] which means it returns a function which takes Task[UserCtx] and
gives Task[A] with respect to that operation.
Because Each and every operation depends on UserCtx.
package object gazelle {
type UsrCtxK[A] = Kleisli[Task, UsrCtx, A]
type GazelleService = GazelleOp ~> UsrCtxK
type DbOpF[A] = Free[DbOp, A]
implicit def logOps(log: Logger): LogOps = LogOps(log)
}
final case class UsrCtx(
user: User,
deviceId: DeviceId,
transactionId: TransactionId
)
GazelleOp is an Algorithm.
When we have interpreter inside an object, we can get the interpreter result using this:
val p = app.gzSvc(CreateFollow(follow))
p is UsrCtxK. //Kleisli[Task, UserCtx, A]
When we run the above code with UserCtx, will get Task[A]
val p1 = p.run(ctx)
p1 is Task[CreateFollowResponse]
Ultimately, we need gazelleOp ~> A. But we won't get unless we have userCtx.
So it is better to have interpreter like gazelleOp ~> Kleisli[Task, UserCtx, A]. Amazing right :) !!!
so liftKleisli is used to lift the computation into Kleisli.
Example:
val p:Task[Response] = ???
val q = p.liftKleisli[UsrCtx]
q //Kleisli[Task, UsrCtx, Response]
package gazelle
import scalaz.~>
import doobie.imports._
import doobie.hi.connection._
import DbOp._
import java.util.UUID
import java.time.Instant
import scala.concurrent.duration._
import peddler.{SeriesId, TitleId}
import gazelle.scheduler.Timestamp
object MySqlInterpreter extends (DbOp ~> ConnectionIO) {
implicit val instantMeta: Meta[Instant] =
Meta[Long].xmap(Instant.ofEpochMilli, _.toEpochMilli)
implicit val uuidMeta: Meta[UUID] =
Meta[String].nxmap(UUID.fromString, _.toString)
implicit val followTypeMeta: Meta[FollowType] =
Meta[String].nxmap(FollowType.fromString, _.asString)
implicit val extraTimeMeta: Meta[ExtraTime] =
Meta[Int].xmap(x => ExtraTime(x minutes), _.duration.toMinutes.toInt)
implicit val followComposite: Composite[Follow] =
Composite[
(FollowId,
FollowType,
String,
Option[SeriesId],
Option[TitleId],
Option[ExtraTime],
Option[NewEpisodesOnly],
IsProtected)].xmap({
case (fid, FollowType.Title, _, sId, Some(tid), et, _, ip) =>
TitleFollow(fid, tid, sId, et, ip)
case (fid, FollowType.Series, _, Some(sid), _, et, Some(neo), ip) =>
SeriesFollow(fid, sid, neo, et, ip)
case _ =>
throw new IllegalArgumentException(
"NewEpisodesOnly must be defined for SeriesFollow")
}, {
case TitleFollow(fid, tId, sId, et, ip) =>
(fid,
FollowType.Title: FollowType,
tId.asString,
sId,
Some(tId),
et,
None,
ip)
case SeriesFollow(fid, sId, neo, extraTime, isProtected) =>
(fid,
FollowType.Series: FollowType,
sId.asString,
Some(sId),
None,
extraTime,
Some(neo),
isProtected)
})
implicit val dvrStatusMeta: Meta[DvrStatus] =
Meta[String].nxmap(DvrStatus.fromString, _.asString)
def apply[A](op: DbOp[A]): ConnectionIO[A] = op match {
case PutAccount(acctId, provisionedSpaceGb, streamLimit) =>
putAccount(acctId, provisionedSpaceGb, streamLimit)
case GetAccount(acctId) => getAccount(acctId)
case CreateFollow(user, f) =>
for {
followId <- delay(UUID.randomUUID)
created <- delay(Instant.now())
res <- createFollow(followId, created, user, f)
} yield res
case CreateRecordings(recs) => ???
case CreateRecording(r) => createRecording(r)
case DiskSpaceConflict(acctId) => ???
case GetFollow(id) => getFollow(id)
case GetSeriesFollow(user, seriesId) => ???
case UpdateFollow(id, extraTime, isProtected, newEpisodesOnly) =>
updateFollow(id, extraTime, isProtected, newEpisodesOnly)
case GetTitleFollow(user, titleId) => ???
case DeleteFollow(id) => deleteFollow(id)
case GetAcctFutRecordings(acct, now) => getAcctFutRecordings(acct, now)
case GetAcctRecordings(acctId, titleId) =>
getAcctRecordings(acctId, titleId)
case GetRecordingsBySeriesOrTitle(sot) => ???
case GetAcctUsedSpace(acctId) => ???
case GetEstimatedFutureSpace(acctId) => ???
case GetAcctTotalSpace(acctId) => ???
case GetRecordings(user, titles) => ???
case RemoveRecording(rId) => deleteRecording(rId)
case UpdateRecording(rec) => ???
case PutDvr(dvr) => putDvr(dvr)
case GetDvr(acctId) => getDvr(acctId)
case DeleteDvr(dvrId) => deleteDvr(dvrId)
}
private val selectRecordingFragment =
"""
SELECT
r.id,
r.acct_id,
ro.profile_id,
r.title_id,
r.airing_id,
r.channel_id,
true,
r.air_start_time,
r.air_end_time,
ro.extra_minutes,
ro.is_protected
FROM recording r
INNER JOIN user_rec_opts ro
ON r.id = ro.recording_id
"""
def putAccount(
id: AccountId,
provisionedSpaceGb: ProvisionedSpaceGb,
streamLimit: StreamLimit): ConnectionIO[Unit] = {
val insertOp =
sql"""
INSERT INTO account (
id,
provisioned_space_gb,
stream_limit
) VALUES (
$id,
$provisionedSpaceGb,
$streamLimit
) ON DUPLICATE KEY UPDATE
provisioned_space_gb=$provisionedSpaceGb,
stream_limit=$streamLimit
;
"""
insertOp.update.run.map(_ => ())
}
def getAccount(id: AccountId): ConnectionIO[Option[Account]] = {
val selectOp = sql"""
SELECT
id,
stream_limit,
provisioned_space_gb,
schedule_version
FROM account
WHERE id=$id;
"""
selectOp.query[Account].option
}
def createRecording(r: Recording): ConnectionIO[Unit] = {
for {
_ <- createAccountRecording(r)
_ <- createUserRecOpts(r)
} yield ()
}
def createUserRecOpts(r: Recording): ConnectionIO[Int] = {
val insertOp =
sql"""
INSERT INTO user_rec_opts (
recording_id,
profile_id,
extra_minutes,
is_protected,
keep_for_user
) VALUES (
${r.id},
${r.user.profileId},
${r.bonusTime},
${r.isProtected},
true
);
"""
insertOp.update.run
}
def createAccountRecording(r: Recording): ConnectionIO[Int] = {
//TODO: save real values for channel_version_id, series_id,
// schd_start_time, schd_end_time
val insertOp =
sql"""
INSERT INTO recording (
id,
acct_id,
channel_id,
channel_version_id,
airing_id,
title_id,
series_id,
air_start_time,
air_end_time,
schd_start_time,
schd_end_time
)
VALUES (
${r.id},
${r.user.accountId},
${r.channelId},
'channelVersionId_Stub',
${r.airingId},
${r.titleId},
'seriesId_Stub',
${r.interval.start},
${r.interval.end},
${r.interval.start},
${r.endTime}
);
"""
insertOp.update.run
}
def deleteRecording(rId: RecordingId): ConnectionIO[Unit] = {
//Note that this deletes the recording and ALL user associations
val updateOp = sql"""
DELETE FROM recording
WHERE id=${rId}
"""
updateOp.update.run.map(_ => ())
}
def getAcctFutRecordings(
accountId: AccountId,
now: Timestamp): ConnectionIO[Set[Recording]] = {
val query =
Query[(AccountId, Timestamp), Recording](s"""
${selectRecordingFragment}
WHERE r.acct_id = ? and r.schd_end_time > ?;
""").toQuery0((accountId, now))
query.to[Set]
}
def getAcctRecordings(
acctId: AccountId,
titleId: TitleId): ConnectionIO[Set[Recording]] = {
val query = Query[(AccountId, TitleId), Recording](s"""
${selectRecordingFragment}
WHERE r.acct_id = ? and r.title_id = ?;
""").toQuery0((acctId, titleId))
query.to[Set]
}
def createFollow(
id: UUID,
createdAt: Instant,
user: User,
req: CreateFollowRequest): ConnectionIO[FollowId] = {
val (followType, programId, titleOpt, seriesOpt, neoOpt) =
req.detail match {
case tf: TitleFollowRequest =>
(FollowType.Title,
tf.titleId.asString,
Some(tf.titleId),
tf.seriesId,
None)
case sf: SeriesFollowRequest =>
(FollowType.Series,
sf.seriesId.asString,
None,
Some(sf.seriesId),
Some(sf.newEpisodesOnly))
}
val insertOp = sql"""
INSERT INTO follow (
id,
acct_id,
profile_id,
follow_type,
program_id,
series_id,
title_id,
new_episodes_only,
extra_minutes,
is_protected,
created
)
VALUES (
${id},
${user.accountId},
${user.profileId},
${followType},
${programId},
${seriesOpt},
${titleOpt},
${neoOpt},
${req.extraTime},
${req.isProtected},
${createdAt}
);
"""
insertOp.update.run.map(_ => FollowId(id))
}
def getFollow(id: FollowId): ConnectionIO[Option[Follow]] = {
val selectOp = sql"""
SELECT
id,
follow_type,
program_id,
series_id,
title_id,
extra_minutes,
new_episodes_only,
is_protected
FROM follow
WHERE id = $id
"""
selectOp.query[Follow].option
}
def updateFollow(
id: FollowId,
extraTime: ExtraTime,
isProtected: IsProtected,
newEpisodesOnly: NewEpisodesOnly): ConnectionIO[Unit] = ???
def deleteFollow(id: FollowId): ConnectionIO[Unit] = ???
def putDvr(d: Dvr): ConnectionIO[Unit] = {
val insertOp = sql"""
INSERT INTO dvr (
id,
acct_id,
host_name,
wan_ip,
wan_port,
drive_used_bytes,
status
)
VALUES (
${d.id},
${d.accountId},
${d.hostName},
INET_ATON(${d.wanIp}),
${d.wanPort},
${d.usedBytes},
${d.status}
) ON DUPLICATE KEY UPDATE
host_name = ${d.hostName},
wan_ip = INET_ATON(${d.wanIp}),
wan_port = ${d.wanPort},
drive_used_bytes = ${d.usedBytes},
status = ${d.status}
;
"""
insertOp.update.run.map(_ => ())
}
def getDvr(acctId: AccountId): ConnectionIO[Option[Dvr]] = {
val selectOp = sql"""
SELECT
id,
acct_id,
host_name,
INET_NTOA(wan_ip),
wan_port,
drive_used_bytes,
status
FROM dvr
WHERE acct_id = $acctId
"""
selectOp.query[Dvr].option
}
def deleteDvr(id: DeviceId): ConnectionIO[Unit] = {
val deleteOp = sql"""
DELETE FROM dvr
WHERE id=${id}
"""
deleteOp.update.run.map(_ => ())
}
}