bots

A place where I keep all my bots.
Log | Files | Refs

bot.py (6785B)


      1 import os
      2 import sys
      3 import subprocess
      4 import requests
      5 import frontmatter
      6 from bs4 import BeautifulSoup
      7 from telegram import Update
      8 from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes, ConversationHandler
      9 from dotenv import load_dotenv
     10 
     11 # --- CONFIGURATION ---
     12 load_dotenv()
     13 REPO_PATH = os.getenv("REPO_PATH") 
     14 MD_FILE_PATH = os.path.join(REPO_PATH, "content/listen.md")
     15 IMG_DIR = os.path.join(REPO_PATH, "content/assets/covers")
     16 MAX_SONGS = 9
     17 OWNER_ID = int(os.getenv("BOT_OWNER_ID", "0"))
     18 
     19 if OWNER_ID == 0:
     20     raise RuntimeError("BOT_OWNER_ID not set")
     21 
     22 # Conversation states
     23 TITLE, ARTIST, LINK, COVER = range(4)
     24 
     25 sys.stdout = open('/home/_listen-bot/bots/listen-page-bot/bot_debug.log', 'a', buffering=1)
     26 sys.stderr = sys.stdout
     27 
     28 print("--- Bot started ---")
     29 
     30 async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
     31     if not await is_authorized(update):
     32         await update.message.reply_text("Unauthorized.")
     33         return ConversationHandler.END
     34 
     35     await update.message.reply_text("Let's add a new song! What is the Song Title?")
     36     return TITLE
     37 
     38 async def get_title(update: Update, context: ContextTypes.DEFAULT_TYPE):
     39     if not await is_authorized(update):  
     40         await update.message.reply_text("Unauthorized.")
     41         return ConversationHandler.END
     42 
     43     context.user_data['title'] = update.message.text
     44     await update.message.reply_text(f"Artist for '{update.message.text}'?")
     45     return ARTIST
     46 
     47 async def get_artist(update: Update, context: ContextTypes.DEFAULT_TYPE):
     48     if not await is_authorized(update):
     49         await update.message.reply_text("Unauthorized.")
     50         return ConversationHandler.END
     51 
     52     context.user_data['artist'] = update.message.text
     53     await update.message.reply_text("Paste the YouTube Music link:")
     54     return LINK
     55 
     56 async def get_link(update: Update, context: ContextTypes.DEFAULT_TYPE):
     57     if not await is_authorized(update):
     58         await update.message.reply_text("Unauthorized.")
     59         return ConversationHandler.END
     60 
     61     context.user_data['link'] = update.message.text
     62     await update.message.reply_text("Finally, send the Album Cover image.")
     63     return COVER
     64 
     65 async def get_cover(update: Update, context: ContextTypes.DEFAULT_TYPE):
     66     if not await is_authorized(update):
     67         await update.message.reply_text("Unauthorized.")
     68         return ConversationHandler.END
     69 
     70     # Get the highest resolution photo
     71     photo_file = await update.message.photo[-1].get_file()
     72     
     73     # Generate filename (e.g., song_name.jpg)
     74     safe_name = "".join([c for c in context.user_data['title'] if c.isalnum()]).lower()
     75     img_filename = f"{safe_name}.jpg"
     76     img_path = os.path.join(IMG_DIR, img_filename)
     77     
     78     # Ensure directory exists
     79     os.makedirs(IMG_DIR, exist_ok=True)
     80     
     81     # Download image
     82     await photo_file.download_to_drive(img_path)
     83     
     84     # Update the Markdown file
     85     update_markdown(
     86         title=context.user_data['title'],
     87         artist=context.user_data['artist'],
     88         link=context.user_data['link'],
     89         img_url=f"/assets/covers/{img_filename}" # Relative path for HTML
     90     )
     91 
     92     run_local_deploy(REPO_PATH)
     93 
     94     await update.message.reply_text("Successfully updated listen.md and saved cover!")
     95     return ConversationHandler.END
     96 
     97 def update_markdown(title, artist, link, img_url):
     98     # Load the MD file
     99     post = frontmatter.load(MD_FILE_PATH)
    100     
    101     # Use BeautifulSoup to parse the HTML within the markdown content
    102     soup = BeautifulSoup(post.content, 'html.parser')
    103     gallery = soup.find('section', class_='song-gallery')
    104     
    105     if not gallery:
    106         # Fallback if section doesn't exist
    107         return
    108 
    109     # Create new song card HTML
    110     new_card_html = f'''
    111   <div class="song-card">
    112     <img src="{img_url}" alt="{title} Album Art">
    113     <div class="song-info">
    114       <span class="song-title">{title}</span>
    115       <span class="artist-name">{artist}</span>
    116       <a href="{link}" class="song-link">Listen</a>
    117     </div>
    118   </div>'''
    119     new_card_soup = BeautifulSoup(new_card_html, 'html.parser')
    120     
    121     # Add new card to the top (FIFO behavior)
    122     gallery.insert(0, new_card_soup)
    123     
    124     # Enforce MAX_SONGS (9)
    125     cards = gallery.find_all('div', class_='song-card')
    126     if len(cards) > MAX_SONGS:
    127         for extra in cards[MAX_SONGS:]:
    128             extra.decompose()
    129 
    130     # Format the content back (BeautifulSoup adds extra formatting, 
    131     # so we clean up the string slightly)
    132     post.content = f"\nAn overview of music and podcasts I currently have on repeat. [Checkout](blog/listen-cms/) how I manage this page to keep it current, without touching a single piece of code (until it breaks).\n\n## Music\n\n{gallery.prettify()}"
    133     
    134     # Save file
    135     with open(MD_FILE_PATH, 'wb') as f:
    136         frontmatter.dump(post, f)
    137 
    138 async def is_authorized(update):
    139     user = update.effective_user
    140     if not user:
    141         return False
    142 
    143     if user.id != OWNER_ID:
    144         print(f"Unauthorized access attempt from {user.id}")
    145         return False
    146 
    147     return True
    148 
    149 def run_local_deploy(repo_path):
    150     try:
    151         # 1. Git Add & Commit
    152         subprocess.run(["git", "add", "."], cwd=repo_path, check=True)
    153         subprocess.run(["git", "commit", "-m", "Bot: auto-update listen.md"], cwd=repo_path, check=True)
    154         
    155         # 2. Git Push 
    156         # (Works instantly if remote is a local path or SSH key has no passphrase)
    157         subprocess.run(["git", "push"], cwd=repo_path, check=True)
    158         
    159         # 3. Make Deploy
    160         # This calls the Makefile which uses 'doas rsync'
    161         subprocess.run(["make", "deploy"], cwd=repo_path, check=True)
    162         
    163         return True
    164     except subprocess.CalledProcessError as e:
    165         print(f"Deployment failed at: {e.cmd}")
    166         return False
    167 
    168 async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
    169     await update.message.reply_text("Cancelled.")
    170     return ConversationHandler.END
    171 
    172 if __name__ == '__main__':
    173     repo_path = REPO_PATH
    174     subprocess.run(["git", "fetch", "origin" ], cwd=repo_path, check=True)
    175     subprocess.run(["git", "pull" ], cwd=repo_path, check=True)
    176 
    177     app = ApplicationBuilder().token(os.getenv("TELEGRAM_TOKEN")).build()
    178 
    179     conv_handler = ConversationHandler(
    180         entry_points=[CommandHandler('add', start)],
    181         states={
    182             TITLE: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_title)],
    183             ARTIST: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_artist)],
    184             LINK: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_link)],
    185             COVER: [MessageHandler(filters.PHOTO, get_cover)],
    186         },
    187         fallbacks=[CommandHandler('cancel', cancel)],
    188     )
    189 
    190     app.add_handler(conv_handler)
    191     print("Bot is running...")
    192     app.run_polling()