This is my attempt to get Alexa to return Plex's On Deck and Recently Downloaded lists. It's not the prettiest, but Plex's API isn't the best at the moment. Step-by-step blog post may be found here: http://mlapida.com/thoughts/plex-alexa-interacting-with-your-media-server-through-voice
from __future__ import print_function
import urllib
import urllib2
import xml.etree.ElementTree
import logging
#enable basic logging to CloudWatch Logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
print("event.session.application.applicationId=" +
event['session']['application']['applicationId'])
#only run if requested by a specific app ID (past your app ID below)
if (event['session']['application']['applicationId'] !=
"amzn1.echo-sdk-ams.app.[AppIDHere]"):
raise ValueError("Invalid Application ID")
if event['session']['new']:
on_session_started({'requestId': event['request']['requestId']},
event['session'])
if event['request']['type'] == "LaunchRequest":
return on_launch(event['request'], event['session'])
elif event['request']['type'] == "IntentRequest":
return on_intent(event['request'], event['session'])
elif event['request']['type'] == "SessionEndedRequest":
return on_session_ended(event['request'], event['session'])
def on_session_started(session_started_request, session):
#called when the session starts
#nothing to do here
print("on_session_started requestId=" + session_started_request['requestId']
+ ", sessionId=" + session['sessionId'])
def on_launch(launch_request, session):
#called when the user launches the skill without specifying what they want
print("on_launch requestId=" + launch_request['requestId'] +
", sessionId=" + session['sessionId'])
# Dispatch to your skill's launch
return get_welcome_response()
def on_intent(intent_request, session):
#called when the user specifies an intent for this skill
print("on_intent requestId=" + intent_request['requestId'] +
", sessionId=" + session['sessionId'])
intent = intent_request['intent']
intent_name = intent_request['intent']['name']
#Plex Credentials - Please Fill In
username = "[username here]"
password = "[password]"
authKey = plex_login(username,password)
# Dispatch to your skill's intent handlers
if intent_name == "Plex":
return plex_list_on_desk(intent, session, authKey)
elif intent_name == "OnDeck":
return plex_list_on_desk(intent, session, authKey)
elif intent_name == "Download":
return plex_list_download(intent, session, authKey)
elif intent_name == "AMAZON.HelpIntent":
return get_welcome_response()
else:
raise ValueError("Invalid intent")
def on_session_ended(session_ended_request, session):
#Called when the user ends the session
print("on_session_ended requestId=" + session_ended_request['requestId'] +
", sessionId=" + session['sessionId'])
# add cleanup logic here
# --------------- Functions that control the skill's behavior ------------------
def get_welcome_response():
#the standard welcome message
session_attributes = {}
card_title = "Welcome"
speech_output = "Welcome to the Plex Skill. " \
"You can requet items on deck " \
"or list downloads"
# If the user either does not reply to the welcome message or says something
# that is not understood, they will be prompted again with this text.
reprompt_text = "Welcome to the Plex Skill. " \
"You can requet items on deck " \
"or list downloads"
should_end_session = False
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
def plex_list_on_desk(intent, session, authKey):
#return on deck TV shows
card_title = intent['name']
session_attributes = {}
should_end_session = True
#call the on deck function
OnDeck = ret_on_deck(str(find_plex_server(authKey)),authKey)
speech_output = OnDeck
reprompt_text = OnDeck
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
def plex_list_download(intent, session, authKey):
#return recently downloaded
card_title = intent['name']
session_attributes = {}
should_end_session = True
#call the recently downloaded function
OnDeck = ret_download(str(find_plex_server(authKey)),authKey)
speech_output = OnDeck
reprompt_text = OnDeck
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
# --------------- Helpers that build all of the responses ----------------------
def build_speechlet_response(title, output, reprompt_text, should_end_session):
#creates the JSON payload for Alexa
return {
'outputSpeech': {
'type': 'PlainText',
'text': output
},
'card': {
'type': 'Simple',
'title': 'SessionSpeechlet - ' + title,
'content': 'SessionSpeechlet - ' + output
},
'reprompt': {
'outputSpeech': {
'type': 'PlainText',
'text': reprompt_text
}
},
'shouldEndSession': should_end_session
}
def build_response(session_attributes, speechlet_response):
return {
'version': '1.0',
'sessionAttributes': session_attributes,
'response': speechlet_response
}
# --------------- Functions specific to Plex ----------------------
def find_plex_server(authKey):
#Select the first server you find
url = "https://plex.tv/devices.xml?X-Plex-Token=" + authKey
try:
request = urllib2.Request(url)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
x = 0
#[TODO]: There should be a better way to do this. Check the API docs
for atype in e.findall('Device'):
if atype.get('provides') == "server":
for conns in atype.findall('Connection'):
if x == 0:
return(conns.get('uri'))
x += 1
except urllib2.URLError, e:
print(e)
def ret_on_deck(url,authKey):
#returns a list of TV Shows that are "on deck"
MainServerURL = url + "/library/onDeck?X-Plex-Token=" + authKey
s = "TV Shows On Deck: \n"
try:
request = urllib2.Request(MainServerURL)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
#look for the TV shows only
for atype in e.findall('Video'):
if atype.get('librarySectionTitle') == "TV Shows":
s += atype.get('grandparentTitle').split("(")[0].strip() + ". \n"
return s
except urllib2.URLError, e:
print(e)
def ret_download(url,authKey):
#returns a list of downloads
MainServerURL = url + "/library/recentlyAdded?X-Plex-Token=" + authKey
#start the string
s = "Recently Added: \n"
#number of shows to list
c = 5
try:
request = urllib2.Request(MainServerURL)
result = urllib2.urlopen(request)
e = xml.etree.ElementTree.fromstring(result.read())
t = 0
m = 0
#search for TV shows
for atype in e.findall('Directory'):
if t == 0:
s = s + "In TV: \n"
if atype.get('type') == "season" and t < c:
s += atype.get('parentTitle').split("(")[0].strip() + ". \n"
t += 1
#search for Movies
for atype in e.findall('Video'):
if m == 0:
s += "In Movies: \n"
if atype.get('type') == "movie" and m < c:
s += atype.get('title').split("(")[0].strip() + ". \n"
m += 1
return s
except urllib2.URLError, e:
print(e)
def plex_login(username,password):
#returns a Plex auth token
try:
url = "https://plex.tv/users/sign_in.xml"
headers = {
'x-plex-device-name': "AWS Lambda",
'x-plex-device': "AWSv01",
'x-plex-client-identifier': "049ouolknf9u42oihen"
}
values = {
'user[login]' : username,
'user[password]': password
}
data = urllib.urlencode(values)
req = urllib2.Request(url,data,headers)
response = urllib2.urlopen(req)
e = xml.etree.ElementTree.fromstring(response.read())
return(e.get('authenticationToken'))
except urllib2.URLError, e:
print(e)