import asyncio import json import websockets import curses import random import colorsys import time from collections import deque class BlueskyFirehoseVisualizer: def __init__(self, stdscr, websocket_url): self.stdscr = stdscr self.websocket_url = websocket_url # Initialize color pairs curses.start_color() curses.use_default_colors() # Set up screen curses.curs_set(0) self.stdscr.clear() # Track posts with their display details self.posts = {} self.max_posts = 1000 # Limit to prevent memory growth # Generate color palette self.color_palette = self._generate_color_palette() def _generate_color_palette(self, num_colors=256): """Generate a diverse color palette.""" colors = [] for i in range(num_colors): # Use HSV color space to generate visually distinct colors hue = (i / num_colors) % 1.0 saturation = 0.7 + (random.random() * 0.3) # 70-100% saturation value = 0.7 + (random.random() * 0.3) # 70-100% brightness # Convert HSV to RGB rgb = colorsys.hsv_to_rgb(hue, saturation, value) # Scale RGB to curses color range (0-1000) r, g, b = [int(x * 1000) for x in rgb] # Initialize color pair try: color_index = len(colors) + 1 # Start from 1 curses.init_color(color_index, r, g, b) curses.init_pair(color_index, color_index, -1) colors.append(color_index) except Exception: # If we run out of color pairs, wrap around color_index = (len(colors) % 256) + 1 colors.append(color_index) return colors def _display_post(self, post_id, text): """Display a post with a unique color, fading out over time.""" # Assign a unique color color_index = self.color_palette[hash(post_id) % len(self.color_palette)] # Get screen dimensions max_y, max_x = self.stdscr.getmaxyx() # Trim text to fit screen width text = text[:max_x-1] # Track post details if post_id not in self.posts: # Remove oldest post if we've reached max if len(self.posts) >= self.max_posts: oldest_id = min(self.posts, key=lambda k: self.posts[k]['timestamp']) del self.posts[oldest_id] # Find a free vertical position used_y = set(post['y'] for post in self.posts.values()) y = next(i for i in range(max_y) if i not in used_y) self.posts[post_id] = { 'text': text, 'color': color_index, 'y': y, 'timestamp': time.time(), 'fade_count': 0 } # Render posts for pid, post_info in list(self.posts.items()): # Calculate fade effect age = time.time() - post_info['timestamp'] fade_speed = 0.5 # Adjust for desired fade speed if age > fade_speed * post_info['fade_count']: try: # Gradually reduce text intensity intensity = max(0, 1 - (post_info['fade_count'] / 10)) color = curses.color_pair(post_info['color']) # Render text at constant horizontal position self.stdscr.addstr( post_info['y'], post_info['fade_count'], post_info['text'][:max_x-1], color ) post_info['fade_count'] += 1 except curses.error: # If we can't write (e.g., screen boundaries), remove post del self.posts[pid] # Refresh display self.stdscr.refresh() async def connect_and_visualize(self): """Connect to Bluesky firehose and visualize posts.""" try: async with websockets.connect(self.websocket_url) as websocket: while True: message = await websocket.recv() post = json.loads(message) # Extract meaningful text (adjust based on actual Bluesky JSON structure) post_id = post.get('id', str(random.random())) text = post.get('text', 'Unknown post') # Display the post self._display_post(post_id, text) except Exception as e: self.stdscr.addstr(0, 0, f"Error: {str(e)}") self.stdscr.refresh() def run(self): """Run the visualizer.""" asyncio.run(self.connect_and_visualize()) def main(stdscr): # Replace with actual Bluesky firehose websocket URL BLUESKY_FIREHOSE_WS = "ws://example.com/bluesky-firehose" visualizer = BlueskyFirehoseVisualizer(stdscr, BLUESKY_FIREHOSE_WS) visualizer.run() if __name__ == "__main__": # Wrap main in curses wrapper to handle terminal setup/teardown curses.wrapper(main) # Dependencies (install with pip): # websockets # # Note: You'll need to replace the websocket URL with the actual # Bluesky firehose websocket endpoint when available.