···
16
+
last_river_post_date: float option; (** Timestamp of last River post for this user *)
(** Parse a user registration from JSON-like string format *)
let user_registration_of_string s : user_registration option =
21
-
(* Format: "email|zulip_id|full_name|timestamp|is_admin" *)
22
+
(* Format: "email|zulip_id|full_name|timestamp|is_admin|last_river_post_date" *)
match String.split_on_char '|' s with
24
+
| [email; zulip_id_str; full_name; timestamp_str; is_admin_str; last_river_str] ->
25
+
let last_river_post_date =
26
+
if last_river_str = "" || last_river_str = "none" then None
27
+
else Some (float_of_string last_river_str)
31
+
zulip_id = int_of_string zulip_id_str;
33
+
registered_at = float_of_string timestamp_str;
34
+
is_admin = bool_of_string is_admin_str;
35
+
last_river_post_date;
| [email; zulip_id_str; full_name; timestamp_str; is_admin_str] ->
38
+
(* Backward compatibility - old format without last_river_post_date *)
zulip_id = int_of_string zulip_id_str;
registered_at = float_of_string timestamp_str;
is_admin = bool_of_string is_admin_str;
45
+
last_river_post_date = None;
| [email; zulip_id_str; full_name; timestamp_str] ->
32
-
(* Backward compatibility - old format without is_admin *)
48
+
(* Backward compatibility - old format without is_admin and last_river_post_date *)
zulip_id = int_of_string zulip_id_str;
registered_at = float_of_string timestamp_str;
55
+
last_river_post_date = None;
(** Convert a user registration to string format *)
let user_registration_to_string (reg : user_registration) : string =
45
-
Printf.sprintf "%s|%d|%s|%f|%b"
62
+
let last_river_str = match reg.last_river_post_date with
64
+
| Some t -> string_of_float t
66
+
Printf.sprintf "%s|%d|%s|%f|%b|%s"
(** Storage key for a user registration by Zulip ID - this is the only storage key we use *)
let storage_key_for_id zulip_id = Printf.sprintf "user:id:%d" zulip_id
···
| None -> is_admin || (zulip_id = default_admin_id)
153
+
(* Preserve last_river_post_date if user already exists *)
154
+
let last_river_post_date = match existing_by_id with
155
+
| Some existing -> existing.last_river_post_date
registered_at = Unix.gettimeofday ();
is_admin = final_is_admin;
165
+
last_river_post_date;
let reg_str = user_registration_to_string reg in
···
String.contains domain '.'
210
+
(** {1 River Integration Helper Functions} *)
212
+
(** Configuration storage keys for River *)
213
+
let river_feeds_key = "river:feeds:list"
214
+
let river_channel_key = "river:channel"
215
+
let river_polling_enabled_key = "river:polling:enabled"
216
+
let river_last_sync_key = "river:last_sync"
217
+
let river_default_channel = "Sandbox-test"
219
+
(** Feed source codec *)
220
+
let feed_source_jsont =
221
+
let make name url = { River.name; url } in
222
+
Jsont.Object.map ~kind:"FeedSource" make
223
+
|> Jsont.Object.mem "name" Jsont.string ~enc:(fun s -> s.River.name)
224
+
|> Jsont.Object.mem "url" Jsont.string ~enc:(fun s -> s.River.url)
225
+
|> Jsont.Object.finish
227
+
(** Load feed sources from bot storage *)
228
+
let load_feed_sources storage =
229
+
match Bot_storage.get storage ~key:river_feeds_key with
230
+
| Some json_str when json_str <> "" ->
231
+
(match Jsont_bytesrw.decode_string' (Jsont.list feed_source_jsont) json_str with
233
+
Log.debug (fun m -> m "Loaded %d feed sources" (List.length feeds));
236
+
Log.err (fun m -> m "Failed to parse feed sources: %s" (Jsont.Error.to_string err));
239
+
Log.debug (fun m -> m "No feed sources configured");
242
+
(** Save feed sources to bot storage *)
243
+
let save_feed_sources storage feeds =
244
+
match Jsont_bytesrw.encode_string' ~format:Jsont.Indent (Jsont.list feed_source_jsont) feeds with
246
+
Bot_storage.put storage ~key:river_feeds_key ~value:json_str
248
+
let msg = Printf.sprintf "Failed to encode feed sources: %s" (Jsont.Error.to_string err) in
249
+
Error (Zulip.create_error ~code:(Other "encoding_error") ~msg ())
251
+
(** Add a feed source *)
252
+
let add_feed storage ~name ~url =
253
+
let feeds = load_feed_sources storage in
254
+
if List.exists (fun f -> f.River.url = url) feeds then
255
+
Error (Zulip.create_error ~code:(Other "already_exists")
256
+
~msg:(Printf.sprintf "Feed with URL %s already exists" url) ())
258
+
let new_feed = { River.name; url } in
259
+
save_feed_sources storage (new_feed :: feeds)
261
+
(** Remove a feed source *)
262
+
let remove_feed storage ~name =
263
+
let feeds = load_feed_sources storage in
264
+
let updated_feeds = List.filter (fun f -> f.River.name <> name) feeds in
265
+
if List.length updated_feeds = List.length feeds then
266
+
Error (Zulip.create_error ~code:(Other "not_found")
267
+
~msg:(Printf.sprintf "Feed '%s' not found" name) ())
269
+
save_feed_sources storage updated_feeds
271
+
(** Get target Zulip channel *)
272
+
let get_river_channel storage =
273
+
match Bot_storage.get storage ~key:river_channel_key with
274
+
| Some ch when ch <> "" -> ch
275
+
| _ -> river_default_channel
277
+
(** Set target Zulip channel *)
278
+
let set_river_channel storage channel =
279
+
Bot_storage.put storage ~key:river_channel_key ~value:channel
281
+
(** Check if polling is enabled *)
282
+
let is_river_polling_enabled storage =
283
+
match Bot_storage.get storage ~key:river_polling_enabled_key with
284
+
| Some "true" -> true
287
+
(** Enable polling *)
288
+
let enable_river_polling storage =
289
+
Bot_storage.put storage ~key:river_polling_enabled_key ~value:"true"
291
+
(** Disable polling *)
292
+
let disable_river_polling storage =
293
+
Bot_storage.put storage ~key:river_polling_enabled_key ~value:"false"
295
+
(** Get last sync timestamp *)
296
+
let get_river_last_sync storage =
297
+
match Bot_storage.get storage ~key:river_last_sync_key with
298
+
| Some ts_str when ts_str <> "" ->
299
+
(try Some (float_of_string ts_str) with _ -> None)
302
+
(** Update last sync timestamp *)
303
+
let update_river_last_sync storage timestamp =
304
+
Bot_storage.put storage ~key:river_last_sync_key ~value:(string_of_float timestamp)
306
+
(** {1 Command Handlers} *)
(** Handle the 'register' command *)
let handle_register storage sender_email sender_id sender_name custom_email_opt =
(* First, try to fetch the user's profile from the Zulip API to get delivery_email and email *)
···
(String.concat "\n" user_lines)
498
+
(** Handle River 'feeds' command *)
499
+
let handle_river_feeds storage =
500
+
let feeds = load_feed_sources storage in
502
+
"📡 No River feeds configured yet.\n\nUse `river add-feed <name> <url>` to add a feed."
504
+
let feed_lines = List.mapi (fun i feed ->
505
+
Printf.sprintf "%d. **%s**\n URL: `%s`" (i + 1) feed.River.name feed.River.url
507
+
Printf.sprintf "📡 Configured River feeds (%d):\n\n%s\n\nChannel: #%s"
508
+
(List.length feeds)
509
+
(String.concat "\n\n" feed_lines)
510
+
(get_river_channel storage)
512
+
(** Handle River 'add-feed' command *)
513
+
let handle_river_add_feed storage args =
514
+
match String.split_on_char ' ' args |> List.filter (fun s -> s <> "") with
515
+
| name :: url_parts ->
516
+
let url = String.concat " " url_parts in
517
+
(match add_feed storage ~name ~url with
519
+
Printf.sprintf "✅ Added feed **%s**\n URL: `%s`\n\nUse `river sync` to fetch posts." name url
521
+
Printf.sprintf "❌ Failed to add feed: %s" (Zulip.error_message e))
523
+
"Usage: `river add-feed <name> <url>`\n\nExample: `river add-feed \"OCaml Blog\" https://ocaml.org/blog/feed.xml`"
525
+
(** Handle River 'remove-feed' command *)
526
+
let handle_river_remove_feed storage args =
527
+
let name = String.trim args in
529
+
"Usage: `river remove-feed <name>`\n\nExample: `river remove-feed \"OCaml Blog\"`"
531
+
match remove_feed storage ~name with
533
+
Printf.sprintf "✅ Removed feed: **%s**" name
535
+
Printf.sprintf "❌ Failed to remove feed: %s" (Zulip.error_message e)
537
+
(** Handle River 'set-channel' command *)
538
+
let handle_river_set_channel storage args =
539
+
let channel = String.trim args in
540
+
if channel = "" then
541
+
Printf.sprintf "Current channel: #%s\n\nUsage: `river set-channel <channel-name>`\n\nExample: `river set-channel general`"
542
+
(get_river_channel storage)
544
+
match set_river_channel storage channel with
546
+
Printf.sprintf "✅ River posts will now go to #%s" channel
548
+
Printf.sprintf "❌ Failed to set channel: %s" (Zulip.error_message e)
550
+
(** Handle River 'start' command *)
551
+
let handle_river_start storage =
552
+
match enable_river_polling storage with
553
+
| Ok () -> "✅ River polling enabled. Feeds will be checked every 5 minutes."
554
+
| Error e -> Printf.sprintf "❌ Failed to enable polling: %s" (Zulip.error_message e)
556
+
(** Handle River 'stop' command *)
557
+
let handle_river_stop storage =
558
+
match disable_river_polling storage with
559
+
| Ok () -> "⏸️ River polling disabled. Use `river start` to resume."
560
+
| Error e -> Printf.sprintf "❌ Failed to disable polling: %s" (Zulip.error_message e)
562
+
(** Handle River 'status' command *)
563
+
let handle_river_status storage =
564
+
let feeds = load_feed_sources storage in
565
+
let polling_status = if is_river_polling_enabled storage then "✅ Enabled" else "⏸️ Disabled" in
566
+
let last_sync = match get_river_last_sync storage with
567
+
| Some ts -> format_timestamp ts
570
+
Printf.sprintf "📊 River Feed Integration Status:\n\
572
+
• Target channel: #%s\n\
573
+
• Feeds configured: %d\n\
576
+
(get_river_channel storage)
577
+
(List.length feeds)
(** Handle the 'help' command *)
let handle_help sender_name sender_email =
373
-
Printf.sprintf "👋 Hi %s! I'm **Vicuna**, your user registration assistant.\n\n\
374
-
**Available Commands:**\n\
582
+
Printf.sprintf "👋 Hi %s! I'm **Vicuna**, your user registration and feed aggregation assistant.\n\n\
583
+
**User Registration Commands:**\n\
• `register` - Auto-detect your real email or use Zulip email\n\
• `register <your-email@example.com>` - Register with a specific email\n\
• `whoami` - Show your registration status\n\
• `whois <email|id>` - Look up a registered user\n\
379
-
• `list` - List all registered users\n\
588
+
• `list` - List all registered users\n\n\
589
+
**River Feed Commands:**\n\
590
+
• `river feeds` - List all configured feeds\n\
591
+
• `river add-feed <name> <url>` - Add a new feed\n\
592
+
• `river remove-feed <name>` - Remove a feed\n\
593
+
• `river sync` - Force immediate feed sync\n\
594
+
• `river status` - Show River integration status\n\
595
+
• `river set-channel <name>` - Set target Zulip channel\n\
596
+
• `river start` - Enable automatic polling\n\
597
+
• `river stop` - Disable automatic polling\n\
• `help` - Show this help message\n\n\
• `register` - Auto-detect your email (your Zulip email is `%s`)\n\
383
-
• `register alice@mycompany.com` - Register with a specific email\n\
384
-
• `whois alice@example.com` - Look up Alice by email\n\
385
-
• `whois 12345` - Look up user by Zulip ID\n\n\
386
-
**Smart Email Detection:**\n\
387
-
When you use `register` without an email, I'll try to:\n\
388
-
1. Find your delivery email from your Zulip profile (delivery_email)\n\
389
-
2. Use your profile email if available (user.email)\n\
390
-
3. Fall back to your Zulip message email if needed\n\n\
391
-
This means you usually don't need to manually provide your email!\n\n\
601
+
• `river add-feed \"OCaml Weekly\" https://ocaml.org/feed.xml`\n\
602
+
• `river set-channel sandbox-test`\n\n\
Send me a direct message to get started!"
···
handle_whois storage args
671
+
(* Parse river subcommand *)
672
+
let (subcmd, subargs) = parse_command args in
673
+
let subcmd_lower = String.lowercase_ascii subcmd in
674
+
(match subcmd_lower with
675
+
| "" | "feeds" | "list" ->
676
+
handle_river_feeds storage
677
+
| "add-feed" | "add" ->
678
+
handle_river_add_feed storage subargs
679
+
| "remove-feed" | "remove" | "rm" ->
680
+
handle_river_remove_feed storage subargs
681
+
| "set-channel" | "channel" ->
682
+
handle_river_set_channel storage subargs
683
+
| "start" | "enable" ->
684
+
handle_river_start storage
685
+
| "stop" | "disable" ->
686
+
handle_river_stop storage
688
+
handle_river_status storage
690
+
"⏳ Syncing River feeds... (Note: sync requires environment access, use CLI for now)"
692
+
Printf.sprintf "Unknown river command: `%s`\n\nAvailable: feeds, add-feed, remove-feed, set-channel, start, stop, status, sync" subcmd)
Printf.sprintf "Unknown command: `%s`. Use `help` to see available commands." command
···
| Ok () -> Bot_storage.remove storage ~key
734
+
(** Normalize a name for fuzzy matching *)
735
+
let normalize_name name =
737
+
|> String.lowercase_ascii
739
+
|> Str.global_replace (Str.regexp "[ \t\n\r]+") " "
741
+
(** Match user by exact name *)
742
+
let lookup_user_by_name_exact storage name =
743
+
let all_ids = get_all_user_ids storage in
744
+
List.find_map (fun id ->
745
+
match lookup_user_by_id storage id with
746
+
| Some user when user.full_name = name -> Some user
750
+
(** Match user by fuzzy name *)
751
+
let lookup_user_by_name_fuzzy storage name =
752
+
let normalized_query = normalize_name name in
753
+
let all_ids = get_all_user_ids storage in
754
+
List.find_map (fun id ->
755
+
match lookup_user_by_id storage id with
756
+
| Some user when normalize_name user.full_name = normalized_query -> Some user
760
+
(** Smart user matching for a River post *)
761
+
let match_user_for_post storage (post : River.post) =
762
+
let author_email = River.email post in
763
+
let author_name = River.author post in
764
+
Log.debug (fun m -> m "Matching user for post by %s (%s)" author_name author_email);
765
+
(* Try email → name exact → name fuzzy *)
766
+
match lookup_user_by_email storage author_email with
768
+
Log.debug (fun m -> m "Matched by email: %s" user.email);
771
+
(match lookup_user_by_name_exact storage author_name with
773
+
Log.debug (fun m -> m "Matched by exact name: %s" user.full_name);
776
+
match lookup_user_by_name_fuzzy storage author_name with
778
+
Log.debug (fun m -> m "Matched by fuzzy name: %s" user.full_name);
781
+
Log.debug (fun m -> m "No user match found");
784
+
(** Convert HTML content to markdown summary *)
785
+
let content_to_summary content_html ~max_length =
786
+
let markdown = Markdown_converter.to_markdown content_html in
787
+
if String.length markdown <= max_length then markdown
788
+
else String.sub markdown 0 (max_length - 3) ^ "..."
790
+
(** Format a River post for Zulip *)
791
+
let format_river_post ~user_match (post : River.post) =
793
+
match River.summary post with
795
+
| None -> content_to_summary (River.content post) ~max_length:200
798
+
match user_match with
799
+
| Some user -> Printf.sprintf "By @**%s**" user.full_name
800
+
| None -> Printf.sprintf "By %s" (River.author post)
803
+
match River.link post with
804
+
| Some uri -> Printf.sprintf "\n\n[Read more](%s)" (Uri.to_string uri)
807
+
Printf.sprintf "%s\n\n%s%s" author_line summary link_line
809
+
(** Update user's last_river_post_date *)
810
+
let update_user_river_date storage user new_date =
811
+
let updated = { user with last_river_post_date = Some new_date } in
812
+
let reg_str = user_registration_to_string updated in
813
+
Bot_storage.put storage ~key:(storage_key_for_id user.zulip_id) ~value:reg_str
815
+
(** Get latest post date from a list of posts *)
816
+
let get_latest_post_date posts =
817
+
List.fold_left (fun acc post ->
818
+
match River.date post with
820
+
let timestamp = Ptime.to_float_s ptime in
822
+
| None -> Some timestamp
823
+
| Some existing -> Some (max existing timestamp))
827
+
(** Filter posts newer than a timestamp *)
828
+
let filter_posts_since posts since_opt =
829
+
match since_opt with
832
+
List.filter (fun post ->
833
+
match River.date post with
835
+
Ptime.to_float_s ptime > since
839
+
(** Post to Zulip channel *)
840
+
let post_to_zulip client ~channel ~topic ~content =
841
+
let stream_message = Zulip.Message.create ~type_:`Channel ~to_:[channel] ~topic ~content () in
842
+
Zulip.Messages.send client stream_message
844
+
(** Sync feeds and post new items *)
845
+
let sync_river_and_post ~env ~storage ~client () =
846
+
Log.info (fun m -> m "Starting River feed sync");
847
+
let feeds = load_feed_sources storage in
848
+
if feeds = [] then (
849
+
Log.info (fun m -> m "No feeds configured, skipping sync");
853
+
River.with_session env (fun session ->
854
+
Log.debug (fun m -> m "Fetching %d feeds" (List.length feeds));
855
+
let fetched_feeds = List.map (fun source ->
856
+
Log.debug (fun m -> m "Fetching: %s" source.River.name);
857
+
River.fetch session source
859
+
let all_posts = River.posts fetched_feeds in
860
+
Log.info (fun m -> m "Fetched %d total posts" (List.length all_posts));
862
+
(* Post new items *)
863
+
let users = List.filter_map (lookup_user_by_id storage) (get_all_user_ids storage) in
864
+
let posted_count = ref 0 in
865
+
let channel = get_river_channel storage in
867
+
List.iter (fun user ->
868
+
let new_posts = filter_posts_since all_posts user.last_river_post_date in
869
+
List.iter (fun post ->
870
+
let user_match = match_user_for_post storage post in
871
+
let topic = River.title post in
872
+
let content = format_river_post ~user_match post in
873
+
Log.info (fun m -> m "Posting to #%s: %s" channel topic);
874
+
match post_to_zulip client ~channel ~topic ~content with
875
+
| Ok _response -> incr posted_count
876
+
| Error e -> Log.err (fun m -> m "Failed to post: %s" (Zulip.error_message e))
879
+
match get_latest_post_date all_posts with
880
+
| Some latest -> let _ = update_user_river_date storage user latest in ()
884
+
let _ = update_river_last_sync storage (Unix.gettimeofday ()) in
885
+
Log.info (fun m -> m "Sync complete, posted %d items" !posted_count);
889
+
let msg = Printf.sprintf "Sync failed: %s" (Printexc.to_string exn) in
890
+
Log.err (fun m -> m "%s" msg);
891
+
Error (Zulip.create_error ~code:(Other "sync_error") ~msg ())
(** Create the bot handler instance *)
let create_handler config storage identity =