Script to automatically pull movies/shows from top lists on trakt.tv and add them to ombi so it can send them to sonarr/radarr.


Recommended Posts

Tired of adding movies/shows manually in ombi/sonarr/radarr? Here is a script that fetches movies/shows from the top trakt.tv lists: "popular", "recommended", "anticipated", "trending", "boxoffice"  and adds them to ombi. I'm by no means any coding expert, this is all with the help of ChatGPT. :)

 

I know about traktarr, but I wanted something integrated with ombi so I thought why not make it myself and maybe learn some things along the way.

 

You can set the limit for how many movies/shows it should pull in the .env file (if not set it defaults to 5). The script checks your already requested items in ombi and ignores them if present in the lists it pulls from trakt. Now some of the list stays fairly static, like the "popular" list for example. And the "anticipated" list changes more frequently. So that's why you can set your limits per list in the .env file. I've not tested the script with anything over 22 as a limit for each list. The script only pulls from the first page for each list as of right now, so it should be about 30ish items max per list. This suits my needs, but it's not a hard task to get it to check more pages if you want to. I run my script once a week. I thought about making the script push a notification to unraid saying how many movies/shows was requested, but that's for the next version. I have email notification set up in ombi so I get all the requested items in my inbox anyway, but a number of total requests would be nice.

 

You need a few things to run this:
1. If you plan to run the script directly on unraid you need Nerd tools from CA to install python3 and pip. 
2. The user script plugin from CA
3. A free trakt.tv account, to get the API key for trakt.
4. A reverse proxy and a domain.
5. Create a .env file with the following content and save it in a folder on the flash drive (or wherever you're running the script from). Maybe the Extras folder if you have it. Or simply create a folder called scripts. Place the .env file and the script in the same folder. Remember to replace the placeholders with your info.

 

TRAKT_ENDPOINT=https://api.trakt.tv
TRAKT_API_KEY=YOUR_TRAKT_API_KEY

OMBI_MOVIE_ENDPOINT=https://YOUR_DOMAIN/api/v1/Request/movie
OMBI_MOVIE_API_KEY=OMBI_API_KEY

OMBI_TV_ENDPOINT=https://YOUR_DOMAIN/api/v2/Requests/tv
OMBI_TV_REQUESTS_ENDPOINT=https://YOUR_DOMAIN/api/v1/Request/tv?status=Available&status=Processing

OMBI_TV_API_KEY=OMBI_API_KEY
OMBI_USER=YOUR_OMBI_USERNAME

MOVIE_POPULAR_LIMIT=
MOVIE_RECOMMENDED_LIMIT=
MOVIE_ANTICIPATED_LIMIT=
MOVIE_TRENDING_LIMIT=
MOVIE_BOXOFFICE_LIMIT=

SHOW_POPULAR_LIMIT=
SHOW_RECOMMENDED_LIMIT=
SHOW_ANTICIPATED_LIMIT=
SHOW_TRENDING_LIMIT=

 

6. You need the following python dependencies, install with pip. If running directly on unraid create a userscript with the following and set to run at array start.

 

#!/bin/bash
pip install requests
pip install logging
pip install os
pip install datetime
pip install time
pip install python-dotenv

 

7. And the script it self, name it what_you_want.py

 

import requests
import logging
import os
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Set up logging with a specific format and file handler
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_file = "ombi_requests.log"
if not os.path.exists(log_file):
    open(log_file, 'w').close()
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(file_handler)

# Helper function to get limit values from environment variables
def get_limit_from_env(var_name, default):
    try:
        return int(os.getenv(var_name, default))
    except ValueError:
        logging.error(f"Environment variable {var_name} must be an integer. Using default value: {default}")
        return int(default)

# Define the list names to pull from Trakt.tv for movies and shows separately
movie_list_names = ["popular", "recommended", "anticipated", "trending", "boxoffice"]
show_list_names = ["popular", "recommended", "anticipated", "trending"] 

# Load limit values from environment variables
movie_limits = {name: get_limit_from_env(f"MOVIE_{name.upper()}_LIMIT", '5') for name in movie_list_names}
show_limits = {name: get_limit_from_env(f"SHOW_{name.upper()}_LIMIT", '5') for name in show_list_names}

requested_movies = []
requested_shows = []

# Create a retry strategy
retry_strategy = Retry(
    total=3,
    status_forcelist=[400, 401, 403, 404, 500, 502, 503, 504],
    allowed_methods=["GET", "POST"],
    backoff_factor=2
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)

# Delay time between requests
delay_time = 15

# Set up the Trakt.tv API endpoint and headers
trakt_endpoint = os.getenv('TRAKT_ENDPOINT')
trakt_headers = {
    "Content-Type": "application/json",
    "trakt-api-key": os.getenv('TRAKT_API_KEY'),
    "trakt-api-version": "2",
}

# Set up the Ombi API endpoint and headers
ombi_movie_endpoint = os.getenv('OMBI_MOVIE_ENDPOINT')
ombi_movie_headers = {
    "Content-Type": "application/json",
    "ApiKey": os.getenv('OMBI_MOVIE_API_KEY'),
    "UserName": os.getenv('OMBI_USER'),
}

ombi_tv_endpoint = os.getenv('OMBI_TV_ENDPOINT')
ombi_tv_requests_endpoint = os.getenv('OMBI_TV_REQUESTS_ENDPOINT')
ombi_tv_headers = {
    "Content-Type": "application/json",
    "ApiKey": os.getenv('OMBI_TV_API_KEY'),
    "UserName": os.getenv('OMBI_USER'),
    "OmbiVersion": "3",
}

# Function to fetch existing requests from Ombi
def fetch_requests(endpoint, headers, request_list):
    try:
        response = http.get(endpoint, headers=headers)
        response.raise_for_status()
        if "json" in response.headers.get("Content-Type", ""):
            requests_data = response.json()
            for request in requests_data:
                ids = {
                    'tvDbId': request.get('tvDbId'),
                    'theMovieDbId': request.get('theMovieDbId')
                }
                request_list.append(ids)
    except requests.exceptions.HTTPError as err:
        logging.error("Failed to get current requests from Ombi: %s", err)
    except ValueError as err:
        logging.error("Failed to decode response as JSON: %s", err)

# Function to request an item from Ombi
def request_item(endpoint, headers, item, item_type, request_list):
    tmdb_id = item[item_type]["ids"].get("tmdb")
    tvdb_id = item[item_type]["ids"].get("tvdb")

    # Check if the item has already been requested by either ID
    already_requested = any(
        (req.get('tvDbId') == tvdb_id or req.get('theMovieDbId') == tmdb_id) for req in request_list
    )
    if already_requested:
        logging.debug("%s '%s' already requested", item_type.capitalize(), item[item_type]['title'])
        return

    # Prepare the data payload for the request
    data = {"theMovieDbId": tmdb_id} if item_type == "movie" else {"theMovieDbId": tmdb_id, "requestAll": True}

    # Make the POST request to Ombi
    try:
        response = http.post(endpoint, headers=headers, json=data)
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        logging.error("Failed to request %s '%s' from Ombi: %s", item_type, item[item_type]['title'], err)
        return

    # Log the successful request and append the requested ID to the list
    logging.info("Requested %s: %s", item_type, item[item_type]['title'])
    # Append the ID used for requesting to the list to prevent future duplicates
    request_list.append({'tvDbId': tvdb_id, 'theMovieDbId': tmdb_id})
    time.sleep(delay_time)

# Fetch existing movie and TV show requests
fetch_requests(ombi_movie_endpoint, ombi_movie_headers, requested_movies)
fetch_requests(ombi_tv_requests_endpoint, ombi_tv_headers, requested_shows)

# Process movie lists
for list_name in movie_list_names:
    limit = movie_limits[list_name]
    list_endpoint = f"{trakt_endpoint}/movies/{list_name}?extended=full&page=1&limit={limit}"
    try:
        response = http.get(list_endpoint, headers=trakt_headers)
        response.raise_for_status()
        results = response.json()
        for item in results:
            if "movie" in item:
                request_item(ombi_movie_endpoint, ombi_movie_headers, item, "movie", requested_movies)
    except requests.exceptions.HTTPError as err:
        logging.error("Failed to get list of movies from Trakt.tv: %s", err)

# Process show lists
for list_name in show_list_names:
    limit = show_limits[list_name]
    list_endpoint = f"{trakt_endpoint}/shows/{list_name}?extended=full&page=1&limit={limit}"
    try:
        response = http.get(list_endpoint, headers=trakt_headers)
        response.raise_for_status()
        results = response.json()
        for item in results:
            if "show" in item:
                request_item(ombi_tv_endpoint, ombi_tv_headers, item, "show", requested_shows)
    except requests.exceptions.HTTPError as err:
        logging.error("Failed to get list of shows from Trakt.tv: %s", err)

 

8. You can create a user script and run the script once a week or as often as you like. The content of the script should just be: python3 /boot/scripts/name_of_your_script.py Replace the path with your own obviously

Link to comment

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.
Note: Your post will require moderator approval before it will be visible.

Guest
Reply to this topic...

×   Pasted as rich text.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.