155 lines
5.5 KiB
Python
155 lines
5.5 KiB
Python
import asyncio
|
|
import json
|
|
import websockets
|
|
import curses
|
|
import random
|
|
import colorsys
|
|
import time
|
|
from collections import deque
|
|
|
|
class BlueskyFirehoseVisualizer:
|
|
def __init__(self, stdscr, websocket_url):
|
|
self.stdscr = stdscr
|
|
self.websocket_url = websocket_url
|
|
|
|
# Initialize color pairs
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
|
|
# Set up screen
|
|
curses.curs_set(0)
|
|
self.stdscr.clear()
|
|
|
|
# Track posts with their display details
|
|
self.posts = {}
|
|
self.max_posts = 1000 # Limit to prevent memory growth
|
|
|
|
# Generate color palette
|
|
self.color_palette = self._generate_color_palette()
|
|
|
|
def _generate_color_palette(self, num_colors=256):
|
|
"""Generate a diverse color palette."""
|
|
colors = []
|
|
for i in range(num_colors):
|
|
# Use HSV color space to generate visually distinct colors
|
|
hue = (i / num_colors) % 1.0
|
|
saturation = 0.7 + (random.random() * 0.3) # 70-100% saturation
|
|
value = 0.7 + (random.random() * 0.3) # 70-100% brightness
|
|
|
|
# Convert HSV to RGB
|
|
rgb = colorsys.hsv_to_rgb(hue, saturation, value)
|
|
|
|
# Scale RGB to curses color range (0-1000)
|
|
r, g, b = [int(x * 1000) for x in rgb]
|
|
|
|
# Initialize color pair
|
|
try:
|
|
color_index = len(colors) + 1 # Start from 1
|
|
curses.init_color(color_index, r, g, b)
|
|
curses.init_pair(color_index, color_index, -1)
|
|
colors.append(color_index)
|
|
except Exception:
|
|
# If we run out of color pairs, wrap around
|
|
color_index = (len(colors) % 256) + 1
|
|
colors.append(color_index)
|
|
|
|
return colors
|
|
|
|
def _display_post(self, post_id, text):
|
|
"""Display a post with a unique color, fading out over time."""
|
|
# Assign a unique color
|
|
color_index = self.color_palette[hash(post_id) % len(self.color_palette)]
|
|
|
|
# Get screen dimensions
|
|
max_y, max_x = self.stdscr.getmaxyx()
|
|
|
|
# Trim text to fit screen width
|
|
text = text[:max_x-1]
|
|
|
|
# Track post details
|
|
if post_id not in self.posts:
|
|
# Remove oldest post if we've reached max
|
|
if len(self.posts) >= self.max_posts:
|
|
oldest_id = min(self.posts, key=lambda k: self.posts[k]['timestamp'])
|
|
del self.posts[oldest_id]
|
|
|
|
# Find a free vertical position
|
|
used_y = set(post['y'] for post in self.posts.values())
|
|
y = next(i for i in range(max_y) if i not in used_y)
|
|
|
|
self.posts[post_id] = {
|
|
'text': text,
|
|
'color': color_index,
|
|
'y': y,
|
|
'timestamp': time.time(),
|
|
'fade_count': 0
|
|
}
|
|
|
|
# Render posts
|
|
for pid, post_info in list(self.posts.items()):
|
|
# Calculate fade effect
|
|
age = time.time() - post_info['timestamp']
|
|
fade_speed = 0.5 # Adjust for desired fade speed
|
|
|
|
if age > fade_speed * post_info['fade_count']:
|
|
try:
|
|
# Gradually reduce text intensity
|
|
intensity = max(0, 1 - (post_info['fade_count'] / 10))
|
|
color = curses.color_pair(post_info['color'])
|
|
|
|
# Render text at constant horizontal position
|
|
self.stdscr.addstr(
|
|
post_info['y'],
|
|
post_info['fade_count'],
|
|
post_info['text'][:max_x-1],
|
|
color
|
|
)
|
|
|
|
post_info['fade_count'] += 1
|
|
except curses.error:
|
|
# If we can't write (e.g., screen boundaries), remove post
|
|
del self.posts[pid]
|
|
|
|
# Refresh display
|
|
self.stdscr.refresh()
|
|
|
|
async def connect_and_visualize(self):
|
|
"""Connect to Bluesky firehose and visualize posts."""
|
|
try:
|
|
async with websockets.connect(self.websocket_url) as websocket:
|
|
while True:
|
|
message = await websocket.recv()
|
|
post = json.loads(message)
|
|
|
|
# Extract meaningful text (adjust based on actual Bluesky JSON structure)
|
|
post_id = post.get('id', str(random.random()))
|
|
text = post.get('text', 'Unknown post')
|
|
|
|
# Display the post
|
|
self._display_post(post_id, text)
|
|
|
|
except Exception as e:
|
|
self.stdscr.addstr(0, 0, f"Error: {str(e)}")
|
|
self.stdscr.refresh()
|
|
|
|
def run(self):
|
|
"""Run the visualizer."""
|
|
asyncio.run(self.connect_and_visualize())
|
|
|
|
def main(stdscr):
|
|
# Replace with actual Bluesky firehose websocket URL
|
|
BLUESKY_FIREHOSE_WS = "ws://example.com/bluesky-firehose"
|
|
|
|
visualizer = BlueskyFirehoseVisualizer(stdscr, BLUESKY_FIREHOSE_WS)
|
|
visualizer.run()
|
|
|
|
if __name__ == "__main__":
|
|
# Wrap main in curses wrapper to handle terminal setup/teardown
|
|
curses.wrapper(main)
|
|
|
|
# Dependencies (install with pip):
|
|
# websockets
|
|
#
|
|
# Note: You'll need to replace the websocket URL with the actual
|
|
# Bluesky firehose websocket endpoint when available.
|