Source code for the_data_packet.workflows.podcast

"""Main podcast generation workflow."""

import traceback
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional

from the_data_packet.core.config import Config, get_config
from the_data_packet.core.exceptions import TheDataPacketError, ValidationError
from the_data_packet.core.logging import get_logger, upload_current_day_log
from the_data_packet.generation.audio import AudioGenerator, AudioResult
from the_data_packet.generation.rss import RSSGenerator
from the_data_packet.generation.script import ScriptGenerator
from the_data_packet.sources.base import Article
from the_data_packet.sources.techcrunch import TechCrunchSource
from the_data_packet.sources.wired import WiredSource
from the_data_packet.utils.mongodb import MongoDBClient
from the_data_packet.utils.s3 import S3Storage, S3UploadResult

logger = get_logger(__name__)


[docs] @dataclass class PodcastResult: """Result of podcast generation workflow.""" success: bool = False number_of_articles_collected: int = 0 articles_collected: List[Article] = field(default_factory=list) script_generated: bool = False audio_generated: bool = False rss_generated: bool = False script_path: Optional[Path] = None audio_path: Optional[Path] = None rss_path: Optional[Path] = None s3_script_url: Optional[str] = None s3_audio_url: Optional[str] = None s3_rss_url: Optional[str] = None execution_time_seconds: Optional[float] = None error_message: Optional[str] = None
[docs] class PodcastPipeline: """Main podcast generation pipeline.""" # Available article sources SOURCES = { "wired": WiredSource, "techcrunch": TechCrunchSource, }
[docs] def __init__(self, config: Optional[Config] = None): """ Initialize the podcast pipeline. Args: config: Pipeline configuration (defaults to global config) """ self.config = config or get_config() # Validate configuration self._validate_config() # Initialize components (lazy loading) self._script_generator: Optional[ScriptGenerator] = None self._audio_generator: Optional[AudioGenerator] = None self._rss_generator: Optional[RSSGenerator] = None self._s3_storage: Optional[S3Storage] = None logger.info(f"Initialized podcast pipeline for '{self.config.show_name}'")
[docs] def run(self) -> PodcastResult: """ Execute the complete podcast generation pipeline. Returns: PodcastResult with execution details """ start_time = datetime.now() result = PodcastResult() logger.info("Starting podcast generation pipeline") logger.info(f"Sources: {self.config.article_sources}") logger.info(f"Categories: {self.config.article_categories}") logger.info(f"Output: {self.config.output_directory}") try: # Step 1: Collect articles articles = self._collect_articles() new_articles = self._remove_already_used_articles(articles) result.number_of_articles_collected = len(new_articles) result.articles_collected = new_articles if not new_articles: logger.error("No new articles collected after deduplication") # Upload log file before failing if self._should_use_s3(): try: logger.info("Uploading current day's log file to S3") upload_current_day_log(self.config) except Exception as e: logger.warning( f"Failed to upload current day's log file to S3: {e}" ) raise TheDataPacketError("No new articles were collected") # Add used articles to database self._add_article_to_db(new_articles) # Step 2: Generate script (if enabled) script_content = None if self.config.generate_script: self.config.validate_for_script_generation() script_content = self._generate_script(articles) script_path = self._save_script(script_content) result.script_generated = True result.script_path = script_path # Upload to S3 (if configured) if self._should_use_s3(): s3_result = self._upload_to_s3(script_path) result.s3_script_url = s3_result.s3_url # Step 3: Generate audio (if enabled) if self.config.generate_audio: if not script_content: raise TheDataPacketError( "Script content required for audio generation" ) self.config.validate_for_audio_generation() audio_result = self._generate_audio(script_content) result.audio_generated = True result.audio_path = audio_result.output_file # Upload to S3 (if configured) if self._should_use_s3(): s3_result = self._upload_to_s3(audio_result.output_file) result.s3_audio_url = s3_result.s3_url # Step 4: Generate/Update RSS feed (if enabled and audio was uploaded) if self.config.generate_rss and result.s3_audio_url: self._generate_rss_feed( articles, result, audio_result if "audio_result" in locals() else None, ) # Calculate execution time result.execution_time_seconds = ( datetime.now() - start_time ).total_seconds() result.success = True logger.info( f"Pipeline completed successfully in {result.execution_time_seconds:.1f} seconds" ) # Upload current log file to S3 alongside generated files if self._should_use_s3(): try: logger.info("Uploading current day's log file to S3") upload_current_day_log(self.config) except Exception as e: logger.warning( f"Failed to upload current day's log file to S3: {e}" ) # Save episode metadata (non-critical, don't fail pipeline) try: self._save_episode_metadata(result) except Exception as e: logger.warning(f"Failed to save episode metadata to MongoDB: {e}") return result except Exception as e: result.execution_time_seconds = ( datetime.now() - start_time ).total_seconds() result.error_message = str(e) logger.error( f"Pipeline failed after {result.execution_time_seconds:.1f} seconds: {e}" ) if self._should_use_s3(): try: logger.info("Uploading current day's log file to S3") upload_current_day_log(self.config) except Exception as e: logger.warning( f"Failed to upload current day's log file to S3: {e}" ) return result
def _collect_articles(self) -> List[Article]: """Collect articles from all configured sources.""" logger.info("Collecting articles") all_articles = [] for source_name in self.config.article_sources: if source_name not in self.SOURCES: logger.warning(f"Unknown article source: {source_name}") continue source_class = self.SOURCES[source_name] source = source_class() for category in self.config.article_categories: # Skip categories not supported by this source if category not in source.supported_categories: logger.warning( f"Category '{category}' not supported by {source_name}" ) continue try: logger.info(f"Collecting {category} articles from {source_name}") if self.config.max_articles_per_source == 1: article = source.get_latest_article(category) all_articles.append(article) else: articles = source.get_multiple_articles( category, self.config.max_articles_per_source ) all_articles.extend(articles) except Exception as e: logger.error( f"Failed to collect {category} articles from {source_name}: {e}" ) logger.error(traceback.format_exc()) # Filter valid articles valid_articles = [a for a in all_articles if a.is_valid()] logger.info( f"Collected {len(valid_articles)} valid articles (out of {len(all_articles)} total)" ) return valid_articles def _remove_already_used_articles(self, articles: List[Article]) -> List[Article]: """Check if the article has already been used in previous episodes. This prevents content duplication by checking the MongoDB 'articles' collection for any articles that have been used in previous podcast episodes. Args: articles: List of articles to check for duplication Returns: List of articles that have not been used before """ if not self.config.mongodb_username or not self.config.mongodb_password: logger.warning( "MongoDB credentials are not configured, skipping deduplication" ) return articles logger.info( f"Attempting MongoDB connection for deduplication with username: {self.config.mongodb_username}" ) try: mongo_client = MongoDBClient( username=self.config.mongodb_username, password=self.config.mongodb_password, ) logger.info("MongoDB client created successfully for deduplication") except Exception as e: logger.error(f"Failed to create MongoDB client for deduplication: {e}") logger.warning("Proceeding without deduplication") return articles new_articles = [] try: for article in articles: # Check if article URL already exists in the database logger.debug(f"Checking if article already exists: {article.title}") existing = mongo_client.find_documents("articles", {"url": article.url}) if len(list(existing)) > 0: logger.info( f"Article already used in previous episode: {article.title}" ) continue new_articles.append(article) logger.info( f"Deduplication complete: {len(new_articles)}/{len(articles)} articles are new" ) except Exception as e: logger.error(f"Error during deduplication: {e}") logger.warning("Proceeding with all articles (deduplication failed)") new_articles = articles finally: mongo_client.close() return new_articles def _add_article_to_db(self, articles: List[Article]) -> None: """Add used articles to the MongoDB database for future deduplication. Stores article information to prevent reuse in future episodes. This ensures that each podcast episode contains unique content. Args: articles: List of Article objects to store in the database """ if not self.config.mongodb_username or not self.config.mongodb_password: logger.warning( "MongoDB credentials are not configured, skipping article storage" ) return logger.info( f"Attempting MongoDB connection for article storage with username: {self.config.mongodb_username}" ) try: mongo_client = MongoDBClient( username=self.config.mongodb_username, password=self.config.mongodb_password, ) logger.info("MongoDB client created successfully for article storage") except Exception as e: logger.error(f"Failed to create MongoDB client for article storage: {e}") logger.warning("Proceeding without storing articles") return article_docs = [article.to_dict() for article in articles] logger.info(f"Storing {len(article_docs)} articles to MongoDB") try: for i, article in enumerate(article_docs): logger.debug( f"Storing article {i+1}/{len(article_docs)}: {article.get('title', 'Unknown')}" ) mongo_client.insert_document("articles", article) logger.info( f"Successfully stored {len(article_docs)} articles to MongoDB database" ) except Exception as e: logger.error(f"Failed to store articles to MongoDB: {e}") finally: mongo_client.close() def _generate_script(self, articles: List[Article]) -> str: """Generate podcast script from articles.""" logger.info("Generating podcast script") if not self._script_generator: self._script_generator = ScriptGenerator() return self._script_generator.generate_script(articles) def _generate_audio(self, script_content: str) -> AudioResult: """Generate audio from script.""" logger.info("Generating podcast audio") if not self._audio_generator: self._audio_generator = AudioGenerator() return self._audio_generator.generate_audio(script_content) def _save_script(self, script_content: str) -> Path: """Save script to file.""" # Ensure output directory exists self.config.output_directory.mkdir(parents=True, exist_ok=True) # Generate filename with timestamp if not specified script_filename = ( f"episode_script_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" ) script_path = self.config.output_directory / script_filename try: with open(script_path, "w", encoding="utf-8") as f: f.write(script_content) logger.info(f"Script saved to {script_path}") return script_path except Exception as e: raise TheDataPacketError(f"Failed to save script: {e}") def _upload_to_s3(self, file_path: Path) -> S3UploadResult: """Upload file to S3.""" if not self._s3_storage: self._s3_storage = S3Storage() # Generate S3 key with timestamp timestamp = datetime.now().strftime("%Y-%m-%d") s3_key = f"{self.config.show_name.lower().replace(' ', '-')}/{timestamp}/{file_path.name}" return self._s3_storage.upload_file(file_path, s3_key) def _should_use_s3(self) -> bool: """Check if S3 should be used for uploads.""" return bool(self.config.s3_bucket_name and self.config.aws_access_key_id) def _generate_rss_feed( self, articles: List[Article], result: PodcastResult, audio_result: Optional[AudioResult] = None, ) -> None: """Generate and upload RSS feed for the new episode.""" try: logger.info("Generating RSS feed") if not self._rss_generator: self._rss_generator = RSSGenerator(self.config) # Determine episode number (will be auto-assigned in RSS generator) episode_number = None # Let RSS generator determine the next number # Get audio file size if available file_size = None duration = None if audio_result and audio_result.output_file.exists(): file_size = audio_result.output_file.stat().st_size # Duration could be extracted from audio_result if available duration = getattr(audio_result, "duration", None) # Create podcast episode from articles if result.s3_audio_url: episode = self._rss_generator.generate_episode_from_articles( articles=articles, audio_url=result.s3_audio_url, episode_number=episode_number, duration=duration, file_size=file_size, ) else: logger.error("No S3 audio URL available for RSS generation") return # Update RSS feed rss_result = self._rss_generator.update_rss_feed(episode) if rss_result.success: result.rss_generated = True result.rss_path = rss_result.local_path result.s3_rss_url = rss_result.s3_url logger.info("RSS feed updated successfully") if rss_result.s3_url: logger.info(f"RSS feed URL: {rss_result.s3_url}") else: logger.error(f"Failed to update RSS feed: {rss_result.error_message}") except Exception as e: logger.error(f"RSS feed generation failed: {e}") # Don't fail the entire pipeline for RSS errors def _validate_config(self) -> None: """Validate pipeline configuration.""" errors = [] # Validate article sources unknown_sources = set(self.config.article_sources) - set(self.SOURCES.keys()) if unknown_sources: errors.append(f"Unknown article sources: {', '.join(unknown_sources)}") # Validate that at least one generation is enabled if not self.config.generate_script and not self.config.generate_audio: errors.append( "At least one of generate_script or generate_audio must be enabled" ) if errors: raise ValidationError( f"Configuration validation failed: {'; '.join(errors)}" ) def _save_episode_metadata(self, episode_data: PodcastResult) -> None: """Save episode metadata to MongoDB for tracking and analytics. Stores comprehensive episode information including: - Episode success status and execution time - List of articles used (without full content to save space) - Generated file paths - Timestamps and other metadata Args: episode_data: PodcastResult containing episode information to save """ if not self.config.mongodb_username or not self.config.mongodb_password: logger.warning( "MongoDB credentials are not configured, skipping metadata save" ) return try: mongo_client = MongoDBClient( username=self.config.mongodb_username, password=self.config.mongodb_password, ) # Convert the episode data to a dictionary, converting Article objects to dicts episode_dict = episode_data.__dict__.copy() episode_dict["articles_collected"] = [ article.to_dict() for article in episode_data.articles_collected ] # Convert Path objects to strings for MongoDB compatibility if episode_dict.get("script_path"): episode_dict["script_path"] = str(episode_dict["script_path"]) if episode_dict.get("audio_path"): episode_dict["audio_path"] = str(episode_dict["audio_path"]) if episode_dict.get("rss_path"): episode_dict["rss_path"] = str(episode_dict["rss_path"]) # Remove article content to save space in database for article in episode_dict["articles_collected"]: article.pop("content", None) mongo_client.insert_document("episodes", episode_dict) logger.info("Added episode metadata to MongoDB database") except Exception as e: logger.warning(f"Failed to save episode metadata to MongoDB: {e}")
# Don't re-raise the exception - metadata save is not critical