diff --git a/bsky.py b/bsky.py index 532805e..9d3f562 100644 --- a/bsky.py +++ b/bsky.py @@ -24,10 +24,11 @@ from tools.blocks import attach_user_blocks, detach_user_blocks from datetime import date from notification_db import NotificationDB + def extract_handles_from_data(data): """Recursively extract all unique handles from nested data structure.""" handles = set() - + def _extract_recursive(obj): if isinstance(obj, dict): # Check if this dict has a 'handle' key @@ -40,10 +41,11 @@ def extract_handles_from_data(data): # Recursively check all list items for item in obj: _extract_recursive(item) - + _extract_recursive(data) return list(handles) + # Logging will be configured after argument parsing logger = None prompt_logger = None @@ -51,20 +53,21 @@ prompt_logger = None SHOW_REASONING = False last_archival_query = "archival memory search" + def log_with_panel(message, title=None, border_color="white"): """Log a message with Unicode box-drawing characters""" if title: # Map old color names to appropriate symbols symbol_map = { - "blue": "⚙", # Tool calls - "green": "✓", # Success/completion - "yellow": "◆", # Reasoning - "red": "✗", # Errors - "white": "▶", # Default/mentions - "cyan": "✎", # Posts + "blue": "⚙", # Tool calls + "green": "✓", # Success/completion + "yellow": "◆", # Reasoning + "red": "✗", # Errors + "white": "▶", # Default/mentions + "cyan": "✎", # Posts } symbol = symbol_map.get(border_color, "▶") - + print(f"\n{symbol} {title}") print(f" {'─' * len(title)}") # Indent message lines @@ -97,6 +100,10 @@ MAX_PROCESSED_NOTIFICATIONS = 10000 message_counters = defaultdict(int) start_time = time.time() +# Synthesis cycle configuration (default set in argparse) +SYNTHESIS_CYCLES = 10 # number of successful mentions to trigger synthesis in normal mode +MENTIONS_SINCE_LAST_SYNTHESIS = 0 + # Testing mode flag TESTING_MODE = False @@ -109,6 +116,7 @@ last_synthesis_time = time.time() # Database for notification tracking NOTIFICATION_DB = None + def export_agent_state(client, agent, skip_git=False): """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" try: @@ -120,28 +128,28 @@ def export_agent_state(client, agent, skip_git=False): return else: logger.info("Exporting agent state (git staging disabled)") - + # Create directories if they don't exist os.makedirs("agent_archive", exist_ok=True) os.makedirs("agents", exist_ok=True) - + # Export agent data logger.info(f"Exporting agent {agent.id}. This takes some time...") agent_data = client.agents.export_file(agent_id=agent.id) - + # Save timestamped archive copy timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") with open(archive_file, 'w', encoding='utf-8') as f: json.dump(agent_data, f, indent=2, ensure_ascii=False) - + # Save current agent state current_file = os.path.join("agents", "void.af") with open(current_file, 'w', encoding='utf-8') as f: json.dump(agent_data, f, indent=2, ensure_ascii=False) - + logger.info(f"Agent exported to {archive_file} and {current_file}") - + # Git add only the current agent file (archive is ignored) unless skip_git is True if not skip_git: try: @@ -149,17 +157,18 @@ def export_agent_state(client, agent, skip_git=False): logger.info("Added current agent file to git staging") except subprocess.CalledProcessError as e: logger.warning(f"Failed to git add agent file: {e}") - + except Exception as e: logger.error(f"Failed to export agent: {e}") + def initialize_void(): logger.info("Starting void agent initialization...") # Get the configured void agent by ID logger.info("Loading void agent from config...") agent_id = letta_config['agent_id'] - + try: void_agent = CLIENT.agents.retrieve(agent_id=agent_id) logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") @@ -167,11 +176,11 @@ def initialize_void(): logger.error(f"Failed to load void agent {agent_id}: {e}") logger.error("Please ensure the agent_id in config.yaml is correct") raise e - + # Export agent state logger.info("Exporting agent state...") export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) - + # Log agent details logger.info(f"Void agent details - ID: {void_agent.id}") logger.info(f"Agent name: {void_agent.name}") @@ -203,16 +212,16 @@ def process_mention(void_agent, atproto_client, notification_data, queue_filepat "no_reply": No reply was generated, move to no_reply directory """ import uuid - + # Generate correlation ID for tracking this notification through the pipeline correlation_id = str(uuid.uuid4())[:8] - + try: logger.info(f"[{correlation_id}] Starting process_mention", extra={ 'correlation_id': correlation_id, 'notification_type': type(notification_data).__name__ }) - + # Handle both dict and object inputs for backwards compatibility if isinstance(notification_data, dict): uri = notification_data['uri'] @@ -225,7 +234,7 @@ def process_mention(void_agent, atproto_client, notification_data, queue_filepat mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" author_handle = notification_data.author.handle author_name = notification_data.author.display_name or author_handle - + logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 'correlation_id': correlation_id, 'author_handle': author_handle, @@ -258,7 +267,8 @@ def process_mention(void_agent, atproto_client, notification_data, queue_filepat if max_thread_posts > 0: thread_post_count = count_thread_posts(thread) if thread_post_count > max_thread_posts: - logger.info(f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention") + logger.info( + f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention") return True # Return True to remove from queue # Get thread context as YAML string @@ -266,32 +276,33 @@ def process_mention(void_agent, atproto_client, notification_data, queue_filepat try: thread_context = thread_to_yaml_string(thread) logger.debug(f"Thread context generated, length: {len(thread_context)} characters") - + # Check if #voidstop appears anywhere in the thread if "#voidstop" in thread_context.lower(): logger.info("Found #voidstop in thread context, skipping this mention") return True # Return True to remove from queue - + # Also check the mention text directly if "#voidstop" in mention_text.lower(): logger.info("Found #voidstop in mention text, skipping this mention") return True # Return True to remove from queue - + # Create a more informative preview by extracting meaningful content lines = thread_context.split('\n') meaningful_lines = [] - + for line in lines: stripped = line.strip() if not stripped: continue - + # Look for lines with actual content (not just structure) - if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): + if any(keyword in line for keyword in + ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): meaningful_lines.append(line) if len(meaningful_lines) >= 5: break - + if meaningful_lines: preview = '\n'.join(meaningful_lines) logger.debug(f"Thread content preview:\n{preview}") @@ -326,24 +337,24 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call all_handles.update(extract_handles_from_data(notification_data)) all_handles.update(extract_handles_from_data(thread.model_dump())) unique_handles = list(all_handles) - + logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") - + # Check if any handles are in known_bots list from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs import json - + try: # Check for known bots in thread bot_check_result = check_known_bots(unique_handles, void_agent) bot_check_data = json.loads(bot_check_result) - + # TEMPORARILY DISABLED: Bot detection causing issues with normal users # TODO: Re-enable after debugging why normal users are being flagged as bots if False: # bot_check_data.get("bot_detected", False): detected_bots = bot_check_data.get("detected_bots", []) logger.info(f"Bot detected in thread: {detected_bots}") - + # Decide whether to respond (10% chance) if not should_respond_to_bot_thread(): logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") @@ -353,11 +364,11 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") else: logger.debug("Bot detection disabled - processing all notifications") - + except Exception as bot_check_error: logger.warning(f"Error checking for bots: {bot_check_error}") # Continue processing if bot check fails - + # Attach user blocks before agent call attached_handles = [] if unique_handles: @@ -378,14 +389,15 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call # Indent the mention text for line in mention_text.split('\n'): print(f" {line}") - + # Log prompt details to separate logger prompt_logger.debug(f"Full prompt being sent:\n{prompt}") - + # Log concise prompt info to main logger thread_handles_count = len(unique_handles) prompt_char_count = len(prompt) - logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") + logger.debug( + f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") try: # Use streaming to avoid 524 timeout errors @@ -395,7 +407,7 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call stream_tokens=False, # Step streaming only (faster than token streaming) max_steps=100 ) - + # Collect the streaming response all_messages = [] for chunk in message_stream: @@ -418,7 +430,7 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call # Indent reasoning lines for line in chunk.reasoning.split('\n'): print(f" {line}") - + # Create ATProto record for reasoning (unless in testing mode) if not testing_mode and hasattr(chunk, 'reasoning'): try: @@ -428,20 +440,21 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call elif chunk.message_type == 'tool_call_message': # Parse tool arguments for better display tool_name = chunk.tool_call.name - + # Create ATProto record for tool call (unless in testing mode) if not testing_mode: try: - tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None + tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, + 'tool_call_id') else None bsky_utils.create_tool_call_record( - atproto_client, - tool_name, + atproto_client, + tool_name, chunk.tool_call.arguments, tool_call_id ) except Exception as e: logger.debug(f"Failed to create tool call record: {e}") - + try: args = json.loads(chunk.tool_call.arguments) # Format based on tool type @@ -456,7 +469,8 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call for line in text.split('\n'): print(f" {line}") else: - log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") + log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", + "blue") elif tool_name == 'archival_memory_search': query = args.get('query', 'unknown') global last_archival_query @@ -468,7 +482,8 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call log_with_panel(content, f"Tool call: {tool_name}", "blue") elif tool_name == 'update_block': label = args.get('label', 'unknown') - value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) + value_preview = str(args.get('value', ''))[:50] + "..." if len( + str(args.get('value', ''))) > 50 else str(args.get('value', '')) log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") else: # Generic display for other tools @@ -483,13 +498,13 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call # Enhanced tool result logging tool_name = chunk.name status = chunk.status - + if status == 'success': # Try to show meaningful result info based on tool type if hasattr(chunk, 'tool_return') and chunk.tool_return: result_str = str(chunk.tool_return) if tool_name == 'archival_memory_search': - + try: # Handle both string and list formats if isinstance(chunk.tool_return, str): @@ -500,12 +515,13 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call start_idx = chunk.tool_return.find('[') end_idx = chunk.tool_return.rfind(']') if start_idx != -1 and end_idx != -1: - list_str = chunk.tool_return[start_idx:end_idx+1] + list_str = chunk.tool_return[start_idx:end_idx + 1] # Use ast.literal_eval since this is Python literal syntax, not JSON import ast results = ast.literal_eval(list_str) else: - logger.warning("Could not find list in archival_memory_search result") + logger.warning( + "Could not find list in archival_memory_search result") results = [] else: logger.warning("Empty string returned from archival_memory_search") @@ -513,19 +529,20 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call else: # If it's already a list, use directly results = chunk.tool_return - - log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green") - + + log_with_panel(f"Found {len(results)} memory entries", + f"Tool result: {tool_name} ✓", "green") + # Use the captured search query from the tool call search_query = last_archival_query - + # Combine all results into a single text block content_text = "" for i, entry in enumerate(results, 1): timestamp = entry.get('timestamp', 'N/A') content = entry.get('content', '') content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" - + # Format with Unicode characters title = f"{search_query} ({len(results)} results)" print(f"\n⚙ {title}") @@ -533,7 +550,7 @@ If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call # Indent content text for line in content_text.strip().split('\n'): print(f" {line}") - + except Exception as e: logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")