2024-11-21 20:29:24 -08:00
import asyncio
import json
import websockets
import random
from colorama import init , Fore , Style
import munch
import colorsys
import math
import fire
import logging
import os
level = os . environ . get ( " LOGLEVEL " , logging . INFO )
try : level = int ( level )
except : level = level . upper ( )
logging . basicConfig ( level = level )
logger = logging . getLogger ( " bfire " )
# Initialize colorama for cross-platform colored output
init ( autoreset = True )
class BlueskyFirehosePrinter :
def __init__ ( self ) :
# Preset list of vibrant colors
self . colors = [
Fore . RED , Fore . GREEN , Fore . YELLOW , Fore . BLUE ,
Fore . MAGENTA , Fore . CYAN , Fore . WHITE ,
Fore . LIGHTRED_EX , Fore . LIGHTGREEN_EX ,
Fore . LIGHTYELLOW_EX , Fore . LIGHTBLUE_EX ,
Fore . LIGHTMAGENTA_EX , Fore . LIGHTCYAN_EX
]
def _get_post_color ( self , ts ) :
""" Deterministically select a color based on post ID^W^W timestamp """
# Use hash of post ID to consistently select a color
#return self.colors[hash(post_id) % len(self.colors)]
return self . colors [ int ( ts ) % len ( self . colors ) ]
def _extract_post_text ( self , post ) :
"""
Extract meaningful text from a post .
Modify this method based on the actual Bluesky post JSON structure .
"""
# Example of potential extraction, will need to be adapted
if isinstance ( post , dict ) :
# Try different possible text fields
text = post . get ( ' text ' ) or \
post . get ( ' content ' , { } ) . get ( ' text ' ) or \
str ( post )
return text [ : 200 ] # Limit text length
return str ( post )
def _hsv_termcolor ( h , s , v ) :
""" [0,1] h, s, v -> 256 color terminal codes """
assert ( h < = 1 and h > = 0 ) , " h "
assert ( s < = 1 and s > = 0 ) , " s "
assert ( v < = 1 and v > = 0 ) , " v "
rgb1 = colorsys . hsv_to_rgb ( h , s , v )
rgb256 = list ( int ( i * 255 ) for i in rgb1 )
colorstr = " \033 [38;2; {} ; {} ; {} m " . format ( * rgb256 )
return colorstr
async def connect_and_print ( self , websocket_url , skips = [ ] , onlys = [ ] , count = None , cfilters = { } , fkeeps = [ ] , fdrops = [ ] ) :
""" Connect to websocket and print posts """
n = 0
try :
async with websockets . connect ( websocket_url ) as websocket :
print ( f " Connected to { websocket_url } " )
while True :
try :
eventws = await websocket . recv ( )
# Parse event
try :
post = json . loads ( eventws )
post = munch . munchify ( post )
except json . JSONDecodeError :
post = " err: " + eventws
2025-02-16 19:11:52 -08:00
#try: post.type
#except Exception as e:
# logger.debug(f"no type on post object. {e=} {post.toJSON()=}")
# continue
#try: post.record["$type"]
#except Exception as e:
# logger.debug(f"no post.commit.$type key. {e=} {post.commit.toJSON()=}")
# continue
# s/type/kind/g -- com -> commit
if onlys and post . kind not in onlys : continue
if post . kind in skips : continue
2024-11-21 20:29:24 -08:00
# type in ["com", "id", "acc"] # [com]mit, [id]entity, [acc]ount, ..? [del]ete? or commit type
ts = post . time_us / / 10e3
# Select color based on post ID
#color = self._get_post_color(post.time_us//10e4)
2025-02-16 19:11:52 -08:00
hsv1 = [ ( ts / 4 % 255 ) / 255 , .8 , .8 ]
2024-11-21 20:29:24 -08:00
# Generate a unique post ID
#post_id = str(hash(json.dumps(post)))
post_id = post . get ( " did " , " r: " + str ( random . random ( ) ) )
# Extract and print text
#text = self._extract_post_text(post)
try :
2025-02-16 19:11:52 -08:00
#if post.type in ["com"]:
if post . kind in [ " commit " ] :
if cfilters . get ( " - " ) and any ( map ( lambda w : w in post . commit . type , cfilters . get ( " - " ) ) ) : continue
if cfilters . get ( " + " ) and not any ( map ( lambda w : w in post . commit . type , cfilters . get ( " + " ) ) ) : continue
if fdrops and any ( map ( lambda w : w in post . commit . record . text , fdrops ) ) : continue
if fkeeps and not any ( map ( lambda w : w in post . commit . record . text , fkeeps ) ) : continue
2024-11-21 20:29:24 -08:00
if post . commit . record . text :
text = post . commit . record . text
else :
text = f " post.commit.record= { post . commit . record . toJSON ( ) } "
hsv1 [ 2 ] = 1 - min ( 1 , math . log ( len ( text ) ) / math . log ( 256 * 16 ) )
except Exception as e :
text = str ( post . toJSON ( ) )
hsv1 [ 1 ] = .8
#hsv1[2] = 1-min(1, math.log(len(text))/math.log(256*16)) ## ~ 80
hsv1 [ 2 ] = 60 / 255
hsv1 [ 2 ] = 120 / 255
#color = "\033[38;2;%s;%s;%sm" % (64,64,64) # /255 ea
# Red\033[0m
#Fore.LIGHTWHITE_EX
if count is not None :
n + = 1
if n > count :
return
# 0 - 500 5 vs 30 vs 90
# [h,s,v]
#int(255 * math.log(len(text)))
#rgb256[2] = min(255, len(text))
#rgb256[1] = max(8, 255-len(text))
#hsv1[1] = max(8, 255-len(text))/256
#hsv1[2] = (max(0, 255-len(text)))/256
### hsv1[2] = 1-min(1, math.log(len(text))/math.log(256*16))
#(max(0, 255-len(text)))/256
rgb1 = colorsys . hsv_to_rgb ( * hsv1 )
rgb256 = list ( int ( i * 255 ) for i in rgb1 )
#logger.info(f"{rgb256=}")
color = " \033 [38;2; {} ; {} ; {} m " . format ( * rgb256 )
hsv1s = " : " . join ( f " { x : .1 } " for x in hsv1 )
ihsv1 = list ( hsv1 )
#ihsv1[1], ihsv1[2] *= .1, .4
ihsv1 [ 2 ] * = .4 # v -- non- dark/black -ness
ihsv1 [ 1 ] * = .1 # s -- color sat
ihsv1 [ 2 ] = max ( .3 , ihsv1 [ 2 ] )
irgb1 = colorsys . hsv_to_rgb ( * ihsv1 )
irgb256 = list ( int ( i * 255 ) for i in irgb1 )
infocolor = " \033 [38;2; {} ; {} ; {} m " . format ( * irgb256 )
# Print colored post
try :
if post . type == " com " :
print ( f ' { infocolor } { int ( ts ) } |type: { getattr ( post , " type " , None ) } | { color } { text } { infocolor } |hsv: { hsv1s } type: { getattr ( post , " type " , None ) } kind: { getattr ( post , " kind " , None ) } { post . commit . type =} { post . commit . operation =} ' )
else :
ihsv1 [ 1 ] * = .3
ihsv1 [ 2 ] = 1
infocolor = _hsv_termcolor ( * ihsv1 )
print ( f ' { infocolor } { int ( ts ) } |type: { getattr ( post , " type " , None ) } | { color } { text } { infocolor } |hsv: { hsv1s } type: { getattr ( post , " type " , None ) } kind: { getattr ( post , " kind " , None ) } ' )
except Exception as e :
print ( f ' { infocolor } { int ( ts ) } | { color } { text } { infocolor } |hsv: { hsv1s } type: { getattr ( post , " type " , None ) } kind: { getattr ( post , " kind " , None ) } -- no post commit ' )
except Exception as e :
print ( f " Error processing event: { e } " )
raise e
except Exception as e :
print ( f " Websocket connection error: { e } " )
async def main ( skips = [ ] , onlys = [ ] , count = None , cfilters = { } , fkeeps = [ ] , fdrops = [ ] ) :
# Replace with actual Bluesky firehose websocket URL
BLUESKY_FIREHOSE_WS = " wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post "
printer = BlueskyFirehosePrinter ( )
await printer . connect_and_print ( BLUESKY_FIREHOSE_WS , skips = skips , onlys = onlys , count = count , cfilters = cfilters , fkeeps = fkeeps , fdrops = fdrops )
def cli_main ( skips = " " , only = " " , cfilters = " " , filters = " " , count = None ) :
"""
run the async func .
- - skip = [ value ] [ , value ] *
- - only = [ value ] [ , value ] *
- - filters = + include , - skip , + more , - nope
- - cfilters = - - commit types : delete , create , ( reply ? ) , ( post ? )
- - count = [ n ] - - stop after
"""
# Run the async main function
skips = skips . split ( " , " )
2025-02-16 19:11:52 -08:00
onlys = only # onlys = list(only)# onlys = only.split(",")
2024-11-21 20:29:24 -08:00
cfs = cfilters . split ( " , " )
cfilters = { " + " : [ f [ 1 : ] for f in cfs if f [ : 1 ] == " + " ] ,
" - " : [ f [ 1 : ] for f in cfs if f [ : 1 ] == " - " ] }
fs = filters . split ( " , " )
fkeeps = [ f [ 1 : ] for f in fs if f [ : 1 ] == " + " ]
fdrops = [ f [ 1 : ] for f in fs if f [ : 1 ] == " - " ]
try :
asyncio . run ( main ( skips = skips , onlys = onlys , count = count , cfilters = cfilters , fkeeps = fkeeps , fdrops = fdrops ) )
except KeyboardInterrupt as kb :
print ( " done " )
if __name__ == " __main__ " :
fire . Fire ( cli_main )
# Dependencies:
# pip install websockets colorama
#
# Notes:
# 1. Replace BLUESKY_FIREHOSE_WS with actual websocket URL
# 2. The post extraction method (_extract_post_text)
# will likely need customization based on the
# actual Bluesky firehose JSON structure