Source code for the_data_packet.generation.rss
"""RSS feed generation and management for podcast episodes."""
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from the_data_packet.core.config import Config, get_config
from the_data_packet.core.exceptions import TheDataPacketError
from the_data_packet.core.logging import get_logger
from the_data_packet.sources.base import Article
from the_data_packet.utils.s3 import S3Storage, S3UploadResult
logger = get_logger(__name__)
[docs]
@dataclass
class PodcastEpisode:
"""Represents a podcast episode for RSS feed."""
title: str
description: str
audio_url: str
pub_date: datetime
episode_number: Optional[int] = None
duration: Optional[str] = None # Format: "HH:MM:SS"
file_size: Optional[int] = None # In bytes
guid: Optional[str] = None
author: Optional[str] = None
[docs]
def __post_init__(self) -> None:
"""Generate GUID if not provided."""
if not self.guid:
# Generate unique GUID based on title and date
date_str = self.pub_date.strftime("%Y%m%d")
self.guid = f"{date_str}-{self.title.lower().replace(' ', '-')}"
[docs]
@dataclass
class RSSGenerationResult:
"""Result of RSS feed generation."""
success: bool = False
rss_content: Optional[str] = None
local_path: Optional[Path] = None
s3_url: Optional[str] = None
error_message: Optional[str] = None
[docs]
class RSSGenerator:
"""Generates and manages RSS feeds for podcast episodes."""
[docs]
def __init__(self, config: Optional[Config] = None) -> None:
"""Initialize RSS generator."""
self.config = config or get_config()
self.s3_storage: Optional[S3Storage] = None
def _get_next_episode_number(self, existing_episodes: List[PodcastEpisode]) -> int:
"""Determine the next episode number based on existing episodes."""
if not existing_episodes:
return 1
# Find the highest episode number
max_episode = 0
for episode in existing_episodes:
if episode.episode_number and episode.episode_number > max_episode:
max_episode = episode.episode_number
return max_episode + 1
[docs]
def generate_episode_from_articles(
self,
articles: List[Article],
audio_url: str,
episode_number: Optional[int] = None,
duration: Optional[str] = None,
file_size: Optional[int] = None,
existing_episodes: Optional[List[PodcastEpisode]] = None,
) -> PodcastEpisode:
"""Generate a podcast episode from articles."""
# Auto-determine episode number if not provided
if episode_number is None:
if existing_episodes is None:
# Try to load existing episodes from S3 if available
if self._should_use_s3():
existing_feed = self._download_existing_rss()
if existing_feed:
existing_episodes = self.load_existing_feed(existing_feed)
else:
existing_episodes = []
else:
existing_episodes = []
episode_number = self._get_next_episode_number(existing_episodes)
# Get current date for title formatting
pub_date = datetime.now()
day = pub_date.day
ordinal_suffix = self._get_ordinal_suffix(day)
date_str = pub_date.strftime(f"%b {day}{ordinal_suffix}, %Y")
# Create episode title with date
title = f"Episode {episode_number} - {date_str}"
# Create description from articles
description_parts = []
for i, article in enumerate(articles, 1):
if len(articles) > 1:
description_parts.append(f"{i}. {article.title}")
else:
description_parts.append(article.title)
if article.url:
description_parts.append(f" Source: {article.url}")
if i < len(articles):
description_parts.append("")
description = "\n".join(description_parts)
return PodcastEpisode(
title=title,
description=description,
audio_url=audio_url,
pub_date=datetime.now(),
episode_number=episode_number,
duration=duration,
file_size=file_size,
author=self.config.show_name,
)
[docs]
def generate_rss_feed(
self,
episodes: List[PodcastEpisode],
channel_title: Optional[str] = None,
channel_description: Optional[str] = None,
channel_link: Optional[str] = None,
channel_image_url: Optional[str] = None,
channel_email: Optional[str] = None,
) -> str:
"""Generate complete RSS feed XML."""
# Create RSS root element
rss = ET.Element("rss", version="2.0")
rss.set("xmlns:itunes", "http://www.itunes.com/dtds/podcast-1.0.dtd")
rss.set("xmlns:content", "http://purl.org/rss/1.0/modules/content/")
# Create channel
channel = ET.SubElement(rss, "channel")
# Channel metadata
ET.SubElement(channel, "title").text = channel_title or self.config.show_name
ET.SubElement(channel, "description").text = (
channel_description
or f"{self.config.show_name} - Your source for the latest tech news and insights"
)
ET.SubElement(channel, "link").text = (
channel_link or "https://github.com/TheWinterShadow/The-Data-Packet"
)
# Add email contact if provided
email = channel_email or self.config.rss_channel_email
if email:
ET.SubElement(channel, "managingEditor").text = (
f"{email} ({self.config.show_name})"
)
ET.SubElement(channel, "webMaster").text = (
f"{email} ({self.config.show_name})"
)
ET.SubElement(channel, "language").text = "en-us"
ET.SubElement(channel, "lastBuildDate").text = self._format_rfc822_date(
datetime.now()
)
ET.SubElement(channel, "pubDate").text = self._format_rfc822_date(
datetime.now()
)
ET.SubElement(channel, "generator").text = "The Data Packet RSS Generator"
# iTunes specific tags
ET.SubElement(channel, "itunes:subtitle").text = "Tech news and insights"
ET.SubElement(channel, "itunes:author").text = self.config.show_name
ET.SubElement(channel, "itunes:summary").text = (
channel_description
or f"{self.config.show_name} - Your source for the latest tech news and insights"
)
ET.SubElement(channel, "itunes:explicit").text = "no"
# iTunes owner information (requires email)
if email:
itunes_owner = ET.SubElement(channel, "itunes:owner")
ET.SubElement(itunes_owner, "itunes:name").text = self.config.show_name
ET.SubElement(itunes_owner, "itunes:email").text = email
# iTunes category
ET.SubElement(channel, "itunes:category", text="Technology")
# Channel image and iTunes cover art
image_url = channel_image_url or self.config.rss_channel_image_url
if image_url:
# Standard RSS image
image = ET.SubElement(channel, "image")
ET.SubElement(image, "url").text = image_url
ET.SubElement(image, "title").text = channel_title or self.config.show_name
ET.SubElement(image, "link").text = channel_link or ""
# iTunes recommended size
ET.SubElement(image, "width").text = "1400"
ET.SubElement(image, "height").text = "1400"
# iTunes cover art
ET.SubElement(channel, "itunes:image", href=image_url)
# Add episodes
for episode in sorted(episodes, key=lambda x: x.pub_date, reverse=True):
self._add_episode_to_channel(channel, episode)
# Generate XML string
self._indent_xml(rss)
xml_str = ET.tostring(rss, encoding="unicode", xml_declaration=True)
return xml_str
def _get_ordinal_suffix(self, day: int) -> str:
"""Get ordinal suffix for day (1st, 2nd, 3rd, 4th, etc.)."""
if 10 <= day % 100 <= 20:
return "th"
else:
return {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
def _add_episode_to_channel(
self, channel: ET.Element, episode: PodcastEpisode
) -> None:
"""Add episode item to RSS channel."""
item = ET.SubElement(channel, "item")
ET.SubElement(item, "title").text = episode.title
ET.SubElement(item, "description").text = episode.description
ET.SubElement(item, "link").text = episode.audio_url
ET.SubElement(item, "guid").text = episode.guid
ET.SubElement(item, "pubDate").text = self._format_rfc822_date(episode.pub_date)
# Enclosure (audio file)
enclosure_attrs = {"url": episode.audio_url, "type": "audio/mpeg"}
if episode.file_size:
enclosure_attrs["length"] = str(episode.file_size)
ET.SubElement(item, "enclosure", enclosure_attrs)
# iTunes specific tags
ET.SubElement(item, "itunes:subtitle").text = episode.title
ET.SubElement(item, "itunes:summary").text = episode.description
if episode.author:
ET.SubElement(item, "itunes:author").text = episode.author
if episode.duration:
ET.SubElement(item, "itunes:duration").text = episode.duration
if episode.episode_number:
ET.SubElement(item, "itunes:episode").text = str(episode.episode_number)
[docs]
def load_existing_feed(self, rss_content: str) -> List[PodcastEpisode]:
"""Parse existing RSS feed and extract episodes."""
try:
root = ET.fromstring(rss_content)
episodes = []
# Find all item elements
for item in root.findall(".//item"):
title_elem = item.find("title")
description_elem = item.find("description")
enclosure_elem = item.find("enclosure")
guid_elem = item.find("guid")
pub_date_elem = item.find("pubDate")
duration_elem = item.find(
"itunes:duration",
{"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"},
)
episode_num_elem = item.find(
"itunes:episode",
{"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"},
)
author_elem = item.find(
"itunes:author",
{"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"},
)
if title_elem is not None and enclosure_elem is not None:
# Parse episode number from title if not in iTunes tag
episode_number = None
if episode_num_elem is not None and episode_num_elem.text:
try:
episode_number = int(episode_num_elem.text)
except (ValueError, TypeError):
pass
# Parse publication date
pub_date = datetime.now()
if pub_date_elem is not None and pub_date_elem.text:
try:
pub_date = datetime.strptime(
pub_date_elem.text, "%a, %d %b %Y %H:%M:%S %z"
).replace(tzinfo=None)
except ValueError:
pass
# Parse file size
file_size = None
length_attr = enclosure_elem.get("length")
if length_attr:
try:
file_size = int(length_attr)
except (ValueError, TypeError):
pass
episode = PodcastEpisode(
title=title_elem.text if title_elem.text else "",
description=(
description_elem.text
if description_elem is not None and description_elem.text
else ""
),
audio_url=enclosure_elem.get("url", ""),
pub_date=pub_date,
episode_number=episode_number,
duration=(
duration_elem.text
if duration_elem is not None and duration_elem.text
else None
),
file_size=file_size,
guid=guid_elem.text if guid_elem is not None else None,
author=(
author_elem.text
if author_elem is not None and author_elem.text
else None
),
)
episodes.append(episode)
logger.info(f"Loaded {len(episodes)} episodes from existing RSS feed")
return episodes
except ET.ParseError as e:
logger.error(f"Failed to parse RSS feed: {e}")
return []
[docs]
def update_rss_feed(self, new_episode: PodcastEpisode) -> RSSGenerationResult:
"""Update RSS feed with new episode."""
result = RSSGenerationResult()
try:
# Load existing episodes from S3
existing_episodes = []
if self._should_use_s3():
existing_feed = self._download_existing_rss()
if existing_feed:
existing_episodes = self.load_existing_feed(existing_feed)
# Auto-assign episode number if not already set
if not new_episode.episode_number:
new_episode.episode_number = self._get_next_episode_number(
existing_episodes
)
# Update the title with the correct episode number
pub_date = new_episode.pub_date
day = pub_date.day
ordinal_suffix = self._get_ordinal_suffix(day)
date_str = pub_date.strftime(f"%b {day}{ordinal_suffix}, %Y")
new_episode.title = f"Episode {new_episode.episode_number} - {date_str}"
# Add new episode to the beginning
all_episodes = [new_episode] + existing_episodes
# Keep only the latest N episodes (configurable)
max_episodes = getattr(self.config, "max_rss_episodes", 50)
all_episodes = all_episodes[:max_episodes]
# Generate new RSS feed with proper metadata
rss_content = self.generate_rss_feed(
episodes=all_episodes,
channel_title=self.config.rss_channel_title,
channel_description=self.config.rss_channel_description,
channel_link=self.config.rss_channel_link,
channel_image_url=self.config.rss_channel_image_url,
channel_email=self.config.rss_channel_email,
)
result.rss_content = rss_content
# Save locally
local_path = self._save_rss_locally(rss_content)
result.local_path = local_path
# Upload to S3
if self._should_use_s3():
s3_result = self._upload_rss_to_s3(local_path)
if s3_result.success:
result.s3_url = s3_result.s3_url
logger.info(f"RSS feed updated successfully: {s3_result.s3_url}")
else:
logger.warning(
f"Failed to upload RSS to S3: {s3_result.error_message}"
)
result.success = True
except Exception as e:
result.error_message = str(e)
logger.error(f"Failed to update RSS feed: {e}")
return result
def _download_existing_rss(self) -> Optional[str]:
"""Download existing RSS feed from S3."""
if not self.s3_storage:
self.s3_storage = S3Storage()
rss_key = f"{self.config.show_name.lower().replace(' ', '-')}/feed.xml"
try:
response = self.s3_storage.s3_client.get_object(
Bucket=self.s3_storage.bucket_name, Key=rss_key
)
content: str = response["Body"].read().decode("utf-8")
logger.info("Downloaded existing RSS feed from S3")
return content
except Exception as e:
logger.info(f"No existing RSS feed found in S3 (or error downloading): {e}")
return None
def _save_rss_locally(self, rss_content: str) -> Path:
"""Save RSS feed to local file."""
# Ensure output directory exists
self.config.output_directory.mkdir(parents=True, exist_ok=True)
# Generate filename
rss_filename = f"{self.config.show_name.lower().replace(' ', '-')}_feed.xml"
rss_path = self.config.output_directory / rss_filename
try:
with open(rss_path, "w", encoding="utf-8") as f:
f.write(rss_content)
logger.info(f"RSS feed saved locally to {rss_path}")
return rss_path
except Exception as e:
raise TheDataPacketError(f"Failed to save RSS feed: {e}")
def _upload_rss_to_s3(self, rss_path: Path) -> S3UploadResult:
"""Upload RSS feed to S3."""
if not self.s3_storage:
self.s3_storage = S3Storage()
# Use consistent S3 key for RSS feed
rss_key = f"{self.config.show_name.lower().replace(' ', '-')}/feed.xml"
return self.s3_storage.upload_file(
rss_path, rss_key, content_type="application/rss+xml"
)
def _should_use_s3(self) -> bool:
"""Check if S3 should be used for uploads."""
return bool(self.config.s3_bucket_name and self.config.aws_access_key_id)
def _format_rfc822_date(self, dt: datetime) -> str:
"""Format datetime as RFC 822 date string."""
return dt.strftime("%a, %d %b %Y %H:%M:%S +0000")
def _indent_xml(self, elem: ET.Element, level: int = 0) -> None:
"""Add indentation to XML for pretty printing."""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
self._indent_xml(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i