moly_kit/widgets/
chat.rs

1use futures::{StreamExt, stream::AbortHandle};
2use makepad_widgets::*;
3use std::cell::{Ref, RefMut};
4use utils::asynchronous::spawn;
5
6use crate::utils::asynchronous::PlatformSendStream;
7use crate::utils::makepad::EventExt;
8use crate::utils::ui_runner::DeferWithRedrawAsync;
9use crate::widgets::moly_modal::MolyModalWidgetExt;
10
11use crate::mcp::mcp_manager::display_name_from_namespaced;
12use crate::*;
13
14live_design!(
15    use link::theme::*;
16    use link::widgets::*;
17    use link::moly_kit_theme::*;
18    use link::shaders::*;
19
20    use crate::widgets::messages::*;
21    use crate::widgets::prompt_input::*;
22    use crate::widgets::moly_modal::*;
23    use crate::widgets::realtime::*;
24
25    pub Chat = {{Chat}} <RoundedView> {
26        flow: Down,
27        messages = <Messages> {}
28        prompt = <PromptInput> {}
29
30        <View> {
31            width: Fill, height: Fit
32            flow: Overlay
33
34            audio_modal = <MolyModal> {
35                dismiss_on_focus_lost: false
36                content: <RealtimeContent> {}
37            }
38        }
39    }
40);
41
42/// A task of interest that was or will be performed by the [Chat] widget.
43///
44/// You can get notified when a group of tasks were already executed by using [Chat::set_hook_after].
45///
46/// You can also "hook" into the group of tasks before it's executed with [Chat::set_hook_before].
47/// This allows you to modify their payloads (which are used by the task when executed), add and remove
48/// tasks from the group, abort the group (by clearing the tasks vector), etc.
49// TODO: Using indexes for many operations like `UpdateMessage` is not ideal. In the future
50// messages may need to have a unique identifier.
51#[derive(Debug, Clone, PartialEq)]
52pub enum ChatTask {
53    /// When received back, it will send the whole chat context to the bot.
54    Send,
55
56    /// When received back, it will cancel the response stream from the bot.
57    Stop,
58
59    /// When received back, it will copy the message at the given index to the clipboard.
60    CopyMessage(usize),
61
62    /// When received back, it will re-write the message history with the given messages.
63    SetMessages(Vec<Message>),
64
65    /// When received back, it will insert a message at the given index.
66    InsertMessage(usize, Message),
67
68    /// When received back, it will delete the message at the given index.
69    DeleteMessage(usize),
70
71    /// When received back, it will update the message at the given index.
72    UpdateMessage(usize, Message),
73
74    /// When received back, it will clear the prompt input.
75    ClearPrompt,
76
77    /// When received back, the chat will scroll to the bottom.
78    ///
79    /// The boolean indicates if the scroll was triggered by a stream or not.
80    ScrollToBottom(bool),
81
82    /// When received back, it will approve and execute the tool calls in the message at the given index.
83    ApproveToolCalls(usize),
84
85    /// When received back, it will deny the tool calls in the message at the given index.
86    DenyToolCalls(usize),
87}
88
89impl From<ChatTask> for Vec<ChatTask> {
90    fn from(task: ChatTask) -> Self {
91        vec![task]
92    }
93}
94
95/// A batteries-included chat to to implement chatbots.
96#[derive(Live, LiveHook, Widget)]
97pub struct Chat {
98    #[deref]
99    deref: View,
100
101    /// The [BotContext] used by this chat to hold bots and interact with them.
102    #[rust]
103    bot_context: Option<BotContext>,
104
105    /// The id of the bot the chat will message when sending.
106    // TODO: Can this be live?
107    // TODO: Default to the first bot in [BotContext] if `None`.
108    #[rust]
109    bot_id: Option<BotId>,
110
111    /// Toggles response streaming on or off. Default is on.
112    // TODO: Implement this.
113    #[live(true)]
114    pub stream: bool,
115
116    #[rust]
117    abort_handle: Option<AbortHandle>,
118
119    /// Used to control we are putting message deltas into the right message during
120    /// streaming.
121    // Note: If messages had unique identifiers, we wouldn't need to keep a copy of
122    // the message as a workaround.
123    #[rust]
124    expected_message: Option<Message>,
125
126    #[rust]
127    hook_before: Option<Box<dyn FnMut(&mut Vec<ChatTask>, &mut Chat, &mut Cx)>>,
128
129    #[rust]
130    hook_after: Option<Box<dyn FnMut(&[ChatTask], &mut Chat, &mut Cx)>>,
131
132    #[rust]
133    is_hooking: bool,
134
135    /// Wether the user has scrolled during the stream of the current message.
136    #[rust]
137    user_scrolled_during_stream: bool,
138
139    /// Tasks queued during dispatch to avoid nested dispatch calls
140    #[rust]
141    pending_tasks: Vec<ChatTask>,
142}
143
144impl Widget for Chat {
145    fn handle_event(&mut self, cx: &mut Cx, event: &Event, scope: &mut Scope) {
146        // Pass down the BotContext if not the same.
147        self.messages_ref().write_with(|m| {
148            if m.bot_context != self.bot_context {
149                m.bot_context = self.bot_context.clone();
150            }
151        });
152
153        self.ui_runner().handle(cx, event, scope, self);
154        self.deref.handle_event(cx, event, scope);
155        self.handle_messages(cx, event);
156        self.handle_prompt_input(cx, event);
157        self.handle_realtime(cx);
158        self.handle_modal_dismissal(cx, event);
159        self.handle_scrolling();
160    }
161
162    fn draw_walk(&mut self, cx: &mut Cx2d, scope: &mut Scope, walk: Walk) -> DrawStep {
163        self.deref.draw_walk(cx, scope, walk)
164    }
165}
166
167impl Chat {
168    /// Getter to the underlying [PromptInputRef] independent of its id.
169    pub fn prompt_input_ref(&self) -> PromptInputRef {
170        self.prompt_input(id!(prompt))
171    }
172
173    /// Getter to the underlying [MessagesRef] independent of its id.
174    pub fn messages_ref(&self) -> MessagesRef {
175        self.messages(id!(messages))
176    }
177
178    fn handle_prompt_input(&mut self, cx: &mut Cx, event: &Event) {
179        if self.prompt_input_ref().read().submitted(event.actions()) {
180            self.handle_submit(cx);
181        }
182
183        if self.prompt_input_ref().read().call_pressed(event.actions()) {
184            self.handle_call(cx);
185        }
186    }
187
188    fn handle_realtime(&mut self, cx: &mut Cx) {
189        if self.realtime(id!(realtime)).connection_requested() {
190            self.dispatch(cx, &mut ChatTask::Send.into());
191        }
192    }
193
194    fn handle_modal_dismissal(&mut self, cx: &mut Cx, event: &Event) {
195        // Check if the modal should be dismissed
196        for action in event.actions() {
197            if let RealtimeModalAction::DismissModal = action.cast() {
198                self.moly_modal(id!(audio_modal)).close(cx);
199            }
200        }
201
202        // Check if the audio modal was dismissed
203        if self.moly_modal(id!(audio_modal)).dismissed(event.actions()) {
204            // Collect conversation messages from the realtime widget before resetting
205            let mut conversation_messages =
206                self.realtime(id!(realtime)).take_conversation_messages();
207
208            // Reset realtime widget state for cleanup
209            self.realtime(id!(realtime)).reset_state(cx);
210
211            // Add conversation messages to chat history preserving order
212            if !conversation_messages.is_empty() {
213                // Get current messages and append the new conversation messages
214                let mut all_messages = self.messages_ref().read().messages.clone();
215
216                // Add a system message before and after the conversation, informing
217                // that a voice call happened.
218                let system_message = Message {
219                    from: EntityId::App,
220                    content: MessageContent {
221                        text: "Voice call started.".to_string(),
222                        ..Default::default()
223                    },
224                    ..Default::default()
225                };
226                conversation_messages.insert(0, system_message);
227
228                let system_message = Message {
229                    from: EntityId::App,
230                    content: MessageContent {
231                        text: "Voice call ended.".to_string(),
232                        ..Default::default()
233                    },
234                    ..Default::default()
235                };
236                conversation_messages.push(system_message);
237
238                all_messages.extend(conversation_messages);
239                self.dispatch(
240                    cx,
241                    &mut vec![
242                        ChatTask::SetMessages(all_messages),
243                        ChatTask::ScrollToBottom(true),
244                    ],
245                );
246            }
247        }
248    }
249
250    fn handle_capabilities(&mut self, cx: &mut Cx) {
251        if let (Some(bot_context), Some(bot_id)) = (&self.bot_context, &self.bot_id) {
252            if let Some(bot) = bot_context.get_bot(bot_id) {
253                self.prompt_input_ref()
254                    .write()
255                    .set_bot_capabilities(cx, Some(bot.capabilities.clone()));
256            } else if self.bot_id.is_none() {
257                self.prompt_input_ref()
258                    .write()
259                    .set_bot_capabilities(cx, None);
260            }
261        }
262    }
263
264    fn handle_scrolling(&mut self) {
265        // If we are waiting for a message, update wether the user has scrolled during the stream.
266        if self.expected_message.is_some() {
267            self.user_scrolled_during_stream = self.messages_ref().read().user_scrolled();
268        }
269    }
270
271    fn handle_messages(&mut self, cx: &mut Cx, event: &Event) {
272        for action in event.actions() {
273            let Some(action) = action.as_widget_action() else {
274                continue;
275            };
276
277            if action.widget_uid != self.messages_ref().widget_uid() {
278                continue;
279            }
280
281            match action.cast::<MessagesAction>() {
282                MessagesAction::Delete(index) => {
283                    self.dispatch(cx, &mut ChatTask::DeleteMessage(index).into());
284                }
285                MessagesAction::Copy(index) => {
286                    self.dispatch(cx, &mut ChatTask::CopyMessage(index).into());
287                }
288                MessagesAction::EditSave(index) => {
289                    let mut tasks = self.messages_ref().read_with(|m| {
290                        let mut message = m.messages[index].clone();
291                        message.update_content(|content| {
292                            content.text = m.current_editor_text().expect("no editor text");
293                        });
294                        ChatTask::UpdateMessage(index, message).into()
295                    });
296
297                    self.dispatch(cx, &mut tasks);
298                }
299                MessagesAction::EditRegenerate(index) => {
300                    let mut tasks = self.messages_ref().read_with(|m| {
301                        let mut messages = m.messages[0..=index].to_vec();
302
303                        let index = m.current_editor_index().expect("no editor index");
304                        let text = m.current_editor_text().expect("no editor text");
305
306                        messages[index].update_content(|content| {
307                            content.text = text;
308                        });
309
310                        vec![ChatTask::SetMessages(messages), ChatTask::Send]
311                    });
312
313                    self.dispatch(cx, &mut tasks);
314                }
315                MessagesAction::ToolApprove(index) => {
316                    self.dispatch(cx, &mut ChatTask::ApproveToolCalls(index).into());
317                }
318                MessagesAction::ToolDeny(index) => {
319                    self.dispatch(cx, &mut ChatTask::DenyToolCalls(index).into());
320                }
321                MessagesAction::None => {}
322            }
323        }
324    }
325
326    fn handle_submit(&mut self, cx: &mut Cx) {
327        let prompt = self.prompt_input_ref();
328
329        if prompt.read().has_send_task() {
330            let next_index = self.messages_ref().read().messages.len();
331            let text = prompt.text();
332            let attachments = prompt
333                .read()
334                .attachment_list_ref()
335                .read()
336                .attachments
337                .clone();
338            let mut composition = Vec::new();
339
340            if !text.is_empty() || !attachments.is_empty() {
341                composition.push(ChatTask::InsertMessage(
342                    next_index,
343                    Message {
344                        from: EntityId::User,
345                        content: MessageContent {
346                            text,
347                            attachments,
348                            ..Default::default()
349                        },
350                        ..Default::default()
351                    },
352                ));
353            }
354
355            composition.extend([ChatTask::Send, ChatTask::ClearPrompt]);
356
357            self.dispatch(cx, &mut composition);
358        } else if prompt.read().has_stop_task() {
359            self.dispatch(cx, &mut ChatTask::Stop.into());
360        }
361    }
362
363    fn handle_call(&mut self, cx: &mut Cx) {
364        // Use the standard send mechanism which will return the upgrade
365        // The upgrade message will be processed in handle_message_delta
366        self.dispatch(cx, &mut ChatTask::Send.into());
367    }
368
369    fn handle_tool_calls(
370        &mut self,
371        _cx: &mut Cx,
372        tool_calls: Vec<ToolCall>,
373        loading_message_index: usize,
374    ) {
375        let context = self
376            .bot_context
377            .as_ref()
378            .expect("no BotContext provided")
379            .clone();
380
381        let ui = self.ui_runner();
382        let future = async move {
383            // Get the tool manager from context
384            let Some(tool_manager) = context.tool_manager() else {
385                ui.defer_with_redraw(move |me, cx, _| {
386                    let error_message = Message {
387                        from: EntityId::Tool,
388                        content: MessageContent {
389                            text: "Tool execution failed: Tool manager not available".to_string(),
390                            ..Default::default()
391                        },
392                        metadata: MessageMetadata {
393                            is_writing: false,
394                            ..MessageMetadata::new()
395                        },
396                        ..Default::default()
397                    };
398                    me.dispatch(
399                        cx,
400                        &mut vec![ChatTask::UpdateMessage(
401                            loading_message_index,
402                            error_message,
403                        )],
404                    );
405                });
406                return;
407            };
408
409            // Execute tool calls using MCP manager
410            let tool_results = tool_manager.execute_tool_calls(tool_calls.clone()).await;
411
412            // Update the loading message with tool results and trigger a new send
413            ui.defer_with_redraw(move |me, cx, _| {
414                // Create formatted text for tool results
415                let results_text = if tool_results.len() == 1 {
416                    let result = &tool_results[0];
417                    let tool_name = tool_calls
418                        .iter()
419                        .find(|tc| tc.id == result.tool_call_id)
420                        .map(|tc| tc.name.as_str())
421                        .unwrap_or("unknown");
422
423                    let display_name = display_name_from_namespaced(tool_name);
424                    if result.is_error {
425                        format!("🔧 Tool '{}' failed:\n{}", display_name, result.content)
426                    } else {
427                        let summary = crate::utils::tool_execution::create_tool_output_summary(
428                            tool_name,
429                            &result.content,
430                        );
431                        format!(
432                            "🔧 Tool '{}' executed successfully:\n`{}`",
433                            display_name, summary
434                        )
435                    }
436                } else {
437                    let mut text = format!("🔧 Executed {} tools:\n\n", tool_results.len());
438                    for result in &tool_results {
439                        let tool_name = tool_calls
440                            .iter()
441                            .find(|tc| tc.id == result.tool_call_id)
442                            .map(|tc| tc.name.as_str())
443                            .unwrap_or("unknown");
444
445                        let display_name = display_name_from_namespaced(tool_name);
446                        if result.is_error {
447                            text.push_str(&format!(
448                                "**{}** ❌: {}\n\n",
449                                display_name, result.content
450                            ));
451                        } else {
452                            let summary = crate::utils::tool_execution::create_tool_output_summary(
453                                tool_name,
454                                &result.content,
455                            );
456                            text.push_str(&format!("**{}** ✅: `{}`\n\n", display_name, summary));
457                        }
458                    }
459                    text
460                };
461
462                // Update the existing loading message with tool results
463                let updated_message = Message {
464                    from: EntityId::Tool, // Tool results use the tool role
465                    content: MessageContent {
466                        text: results_text,
467                        tool_results,
468                        ..Default::default()
469                    },
470                    metadata: MessageMetadata {
471                        is_writing: false, // No longer loading
472                        ..MessageMetadata::new()
473                    },
474                    ..Default::default()
475                };
476
477                me.dispatch(
478                    cx,
479                    &mut vec![
480                        ChatTask::UpdateMessage(loading_message_index, updated_message),
481                        ChatTask::Send, // Trigger a new send with the tool results
482                    ],
483                );
484            });
485        };
486
487        spawn(future);
488    }
489
490    fn handle_send_task(&mut self, cx: &mut Cx) {
491        // Let's start clean before starting a new stream.
492        self.abort();
493
494        // TODO: See `bot_id` TODO.
495        let bot_id = self.bot_id.clone().expect("no bot selected");
496
497        let context = self
498            .bot_context
499            .as_ref()
500            .expect("no BotContext provided")
501            .clone();
502
503        // First check if the bot exists in the BotContext.
504        if context.get_bot(&bot_id).is_none() {
505            // Bot not found, add error message
506            let next_index = self.messages_ref().read().messages.len();
507            let error_message = format!(
508                "App error: Bot not found. The bot might have been disabled or removed. Bot ID: {}",
509                bot_id
510            );
511
512            let message = Message {
513                from: EntityId::App,
514                content: MessageContent {
515                    text: error_message,
516                    ..Default::default()
517                },
518                ..Default::default()
519            };
520
521            self.dispatch(cx, &mut vec![ChatTask::InsertMessage(next_index, message)]);
522            return;
523        }
524
525        let messages_history_context: Vec<Message> = self.messages_ref().write_with(|messages| {
526            messages.bot_context = Some(context.clone());
527
528            messages
529                .messages
530                .iter()
531                .filter(|m| m.metadata.is_idle() && m.from != EntityId::App)
532                .cloned()
533                .collect()
534        });
535
536        // The realtime check is hack to avoid showing a loading message for realtime assistants
537        // TODO: we should base this on upgrade rather than capabilities
538        let bot = context.get_bot(&bot_id).unwrap(); // We already checked it exists above
539        if !bot.capabilities.supports_realtime() {
540            let loading_message = Message {
541                from: EntityId::Bot(bot_id.clone()),
542                metadata: MessageMetadata {
543                    is_writing: true,
544                    ..MessageMetadata::new()
545                },
546                ..Default::default()
547            };
548
549            let next_index = self.messages_ref().read().messages.len();
550            self.dispatch(
551                cx,
552                &mut vec![ChatTask::InsertMessage(next_index, loading_message)],
553            );
554        }
555
556        self.dispatch(cx, &mut vec![ChatTask::ScrollToBottom(false)]);
557        self.prompt_input_ref().write().set_stop();
558        self.redraw(cx);
559
560        let ui = self.ui_runner();
561        let future = async move {
562            let mut client = context.client();
563            let bot = match context.get_bot(&bot_id) {
564                Some(bot) => bot,
565                None => {
566                    // This should never happen as we check above, but handle it gracefully anyway
567                    let bot_id_clone = bot_id.clone(); // Clone the bot_id for the closure
568                    ui.defer_with_redraw(move |me, cx, _| {
569                        let error_message = format!(
570                            "App error: Bot not found during stream initialization. Bot ID: {}",
571                            bot_id_clone
572                        );
573                        let next_index = me.messages_ref().read().messages.len();
574                        let message = Message {
575                            from: EntityId::App,
576                            content: MessageContent {
577                                text: error_message,
578                                ..Default::default()
579                            },
580                            ..Default::default()
581                        };
582                        me.dispatch(cx, &mut vec![ChatTask::InsertMessage(next_index, message)]);
583                    });
584                    return;
585                }
586            };
587
588            let tools = if let Some(tool_manager) = context.tool_manager() {
589                tool_manager.get_all_namespaced_tools()
590            } else {
591                Vec::new()
592            };
593
594            let message_stream = amortize(client.send(&bot.id, &messages_history_context, &tools));
595            let mut message_stream = std::pin::pin!(message_stream);
596            while let Some(result) = message_stream.next().await {
597                // In theory, with the synchroneous defer, if stream messages come
598                // faster than deferred closures are executed, and one closure causes
599                // an abort, the other already deferred closures will still be executed
600                // and may cause race conditions.
601                //
602                // In practice, this never happened in my tests. But better safe than
603                // sorry. The async variant let this async context wait before processing
604                // the next delta. And also allows to stop naturally from here as well
605                // thanks to its ability to send a value back.
606                let should_break = ui
607                    .defer_with_redraw_async(move |me, cx, _| me.handle_message_delta(cx, result))
608                    .await;
609
610                if should_break.unwrap_or(true) {
611                    break;
612                }
613            }
614        };
615
616        let (future, abort_handle) = futures::future::abortable(future);
617        self.abort_handle = Some(abort_handle);
618
619        spawn(async move {
620            // The wrapper Future is only error if aborted.
621            //
622            // Cleanup caused by signaling stuff like `Chat::abort` should be done synchronously
623            // so one can abort and immediately start a new stream without race conditions.
624            //
625            // Only cleanup after natural termination of the stream should be here.
626            if future.await.is_ok() {
627                ui.defer_with_redraw(|me, _, _| me.clean_streaming_artifacts());
628            }
629        });
630    }
631
632    fn handle_stop_task(&mut self, cx: &mut Cx) {
633        self.abort();
634        self.redraw(cx);
635    }
636
637    /// Immediately remove resources/data related to the current streaming and signal
638    /// the stream to stop as soon as possible.
639    fn abort(&mut self) {
640        self.abort_handle.take().map(|handle| handle.abort());
641        self.clean_streaming_artifacts();
642    }
643
644    /// Dispatch a set of tasks to be executed by the [Chat] widget as a single hookable
645    /// unit of work.
646    ///
647    /// You can still hook into these tasks before they are executed if you set a hook with
648    /// [Chat::set_hook_before].
649    ///
650    /// Warning: Like other operation over makepad's [WidgetRef], this function may panic if you hold
651    /// borrows to widgets inside [Chat], for example [Messages] or [PromptInput]. Be aware when using
652    /// `read_with` and `write_with` methods.
653    // TODO: Mitigate interior mutability issues with many tricks or improving makepad.
654    pub fn dispatch(&mut self, cx: &mut Cx, tasks: &mut Vec<ChatTask>) {
655        // Prevent nested dispatch - queue tasks if we're already dispatching
656        if self.is_hooking {
657            self.pending_tasks.extend(tasks.iter().cloned());
658            return;
659        }
660
661        self.is_hooking = true;
662
663        if let Some(mut hook) = self.hook_before.take() {
664            hook(tasks, self, cx);
665            self.hook_before = Some(hook);
666        }
667
668        for task in tasks.iter() {
669            self.handle_task(cx, task);
670        }
671
672        if let Some(mut hook) = self.hook_after.take() {
673            hook(tasks, self, cx);
674            self.hook_after = Some(hook);
675        }
676
677        self.is_hooking = false;
678
679        // Process any pending tasks that were queued during execution
680        if !self.pending_tasks.is_empty() {
681            let mut pending = std::mem::take(&mut self.pending_tasks);
682            self.dispatch(cx, &mut pending);
683        }
684    }
685
686    /// Performs a set of tasks in the [Chat] widget immediately.
687    ///
688    /// This is not hookable.
689    pub fn perform(&mut self, cx: &mut Cx, tasks: &[ChatTask]) {
690        for task in tasks {
691            self.handle_task(cx, &task);
692        }
693    }
694
695    fn handle_task(&mut self, cx: &mut Cx, task: &ChatTask) {
696        match task {
697            ChatTask::CopyMessage(index) => {
698                self.messages_ref().read_with(|m| {
699                    let text = &m.messages[*index].content.text;
700                    cx.copy_to_clipboard(text);
701                });
702            }
703            ChatTask::DeleteMessage(index) => {
704                self.messages_ref().write().messages.remove(*index);
705                self.redraw(cx);
706            }
707            ChatTask::InsertMessage(index, message) => {
708                self.messages_ref()
709                    .write()
710                    .messages
711                    .insert(*index, message.clone());
712                self.redraw(cx);
713            }
714            ChatTask::Send => {
715                self.handle_send_task(cx);
716            }
717            ChatTask::Stop => {
718                self.handle_stop_task(cx);
719            }
720            ChatTask::UpdateMessage(index, message) => {
721                self.messages_ref().write_with(|m| {
722                    let new_message = message.clone();
723                    let old_message = m.messages.get_mut(*index).expect("no message at index");
724
725                    *old_message = new_message;
726                    m.set_message_editor_visibility(*index, false);
727                });
728
729                self.redraw(cx);
730            }
731            ChatTask::SetMessages(messages) => {
732                self.messages_ref().write_with(|m| {
733                    m.messages = messages.clone();
734
735                    if let Some(index) = m.current_editor_index() {
736                        m.set_message_editor_visibility(index, false);
737                    }
738                });
739
740                self.redraw(cx);
741            }
742            ChatTask::ClearPrompt => {
743                self.prompt_input_ref().write().reset(cx);
744            }
745            ChatTask::ScrollToBottom(triggered_by_stream) => {
746                self.messages_ref()
747                    .write()
748                    .scroll_to_bottom(cx, *triggered_by_stream);
749            }
750            ChatTask::ApproveToolCalls(index) => {
751                // Get the tool calls from the message and mark them as approved
752                let mut message_updated = None;
753                let tool_calls = self.messages_ref().write_with(|m| {
754                    if let Some(message) = m.messages.get_mut(*index) {
755                        message.update_content(|content| {
756                            for tool_call in &mut content.tool_calls {
757                                tool_call.permission_status = ToolCallPermissionStatus::Approved;
758                            }
759                        });
760                        message_updated = Some(message.clone());
761                        message.content.tool_calls.clone()
762                    } else {
763                        Vec::new()
764                    }
765                });
766
767                if let Some(message) = message_updated {
768                    self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(*index, message)]);
769                }
770
771                if !tool_calls.is_empty() {
772                    // Add immediate system message with loading state
773                    let next_index = self.messages_ref().read().messages.len();
774                    let loading_text = if tool_calls.len() == 1 {
775                        format!(
776                            "Executing tool '{}'...",
777                            display_name_from_namespaced(&tool_calls[0].name)
778                        )
779                    } else {
780                        format!("Executing {} tools...", tool_calls.len())
781                    };
782
783                    let loading_message = Message {
784                        from: EntityId::Tool,
785                        content: MessageContent {
786                            text: loading_text,
787                            ..Default::default()
788                        },
789                        metadata: MessageMetadata {
790                            is_writing: true,
791                            ..MessageMetadata::new()
792                        },
793                        ..Default::default()
794                    };
795
796                    self.dispatch(
797                        cx,
798                        &mut vec![ChatTask::InsertMessage(next_index, loading_message)],
799                    );
800
801                    self.handle_tool_calls(cx, tool_calls, next_index);
802                } else {
803                    ::log::error!("No tool calls found at index: {}", index);
804                }
805
806                self.redraw(cx);
807            }
808            ChatTask::DenyToolCalls(index) => {
809                // Get the tool calls from the message and mark them as denied
810                let mut message_updated = None;
811                let tool_calls = self.messages_ref().write_with(|m| {
812                    if let Some(message) = m.messages.get_mut(*index) {
813                        message.update_content(|content| {
814                            for tool_call in &mut content.tool_calls {
815                                tool_call.permission_status = ToolCallPermissionStatus::Denied;
816                            }
817                        });
818                        message_updated = Some(message.clone());
819                        message.content.tool_calls.clone()
820                    } else {
821                        Vec::new()
822                    }
823                });
824
825                if let Some(message) = message_updated {
826                    self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(*index, message)]);
827                }
828
829                if !tool_calls.is_empty() {
830                    // Create synthetic tool results indicating denial to maintain conversation flow
831                    let tool_results: Vec<ToolResult> = tool_calls.iter().map(|tc| {
832                        let display_name = display_name_from_namespaced(&tc.name);
833                        ToolResult {
834                            tool_call_id: tc.id.clone(),
835                            content: format!("Tool execution was denied by the user. Tool '{}' was not executed.", display_name),
836                            is_error: true,
837                        }
838                    }).collect();
839
840                    let next_index = self.messages_ref().read().messages.len();
841
842                    // Add tool result message with denial results
843                    let tool_message = Message {
844                        from: EntityId::Tool,
845                        content: MessageContent {
846                            text: "🚫 Tool execution was denied by the user.".to_string(),
847                            tool_results,
848                            ..Default::default()
849                        },
850                        ..Default::default()
851                    };
852
853                    // Continue the conversation with the denial results
854                    self.dispatch(
855                        cx,
856                        &mut vec![
857                            ChatTask::InsertMessage(next_index, tool_message),
858                            ChatTask::Send,
859                        ],
860                    );
861                }
862
863                self.redraw(cx);
864            }
865        }
866    }
867
868    /// Sets a hook to be executed before a group of tasks is executed.
869    ///
870    /// You get mutable access to the group of tasks, so you can modify what is
871    /// about to happen. See [ChatTask] for more details about this.
872    ///
873    /// If you just want to get notified when something already happened, see [Chat::set_hook_after].
874    pub fn set_hook_before(
875        &mut self,
876        hook: impl FnMut(&mut Vec<ChatTask>, &mut Chat, &mut Cx) + 'static,
877    ) {
878        if self.is_hooking {
879            panic!("Cannot set a hook while hooking");
880        }
881
882        self.hook_before = Some(Box::new(hook));
883    }
884
885    /// Sets a hook to be executed after a group of tasks is executed.
886    ///
887    /// You get immutable access to the group of tasks, so you can inspect what happened.
888    pub fn set_hook_after(&mut self, hook: impl FnMut(&[ChatTask], &mut Chat, &mut Cx) + 'static) {
889        if self.is_hooking {
890            panic!("Cannot set a hook while hooking");
891        }
892
893        self.hook_after = Some(Box::new(hook));
894    }
895
896    /// Remove data related to current streaming, leaving everything ready for a new one.
897    ///
898    /// Called as soon as possible after streaming completes naturally or immediately when
899    /// calling [Chat::abort].
900    fn clean_streaming_artifacts(&mut self) {
901        self.abort_handle = None;
902        self.expected_message = None;
903        self.user_scrolled_during_stream = false;
904        self.messages_ref().write().reset_scroll_state();
905        self.prompt_input_ref().write().set_send();
906        self.messages_ref().write().messages.retain_mut(|m| {
907            m.metadata.is_writing = false;
908            !m.content.is_empty()
909        });
910    }
911
912    /// Handles a message delta from the bot.
913    ///
914    /// Returns true if the message delta was handled successfully.
915    fn handle_message_delta(&mut self, cx: &mut Cx, result: ClientResult<MessageContent>) -> bool {
916        let messages = self.messages_ref();
917
918        // For simplicity, lets handle this as an standard Result, ignoring delta
919        // if there are errors.
920        match result.into_result() {
921            Ok(content) => {
922                // Check if this is a realtime upgrade message
923                if let Some(Upgrade::Realtime(channel)) = &content.upgrade {
924                    // Clean up any loading state since we're opening the modal instead
925                    self.clean_streaming_artifacts();
926
927                    // Set up the realtime channel in the UI
928                    let mut realtime = self.realtime(id!(realtime));
929                    realtime.set_bot_entity_id(
930                        cx,
931                        EntityId::Bot(self.bot_id.clone().unwrap_or_default()),
932                    );
933                    realtime.set_realtime_channel(channel.clone());
934                    realtime.set_bot_context(self.bot_context.clone());
935
936                    let modal = self.moly_modal(id!(audio_modal));
937                    modal.open(cx);
938
939                    // Skip the rest, do not add a message to the chat
940                    return true;
941                }
942
943                // Let's abort if we don't have where to put the delta.
944                let Some(mut message) = messages.read().messages.last().cloned() else {
945                    return true;
946                };
947
948                // Let's abort if we see we are putting delta in the wrong message.
949                if let Some(expected_message) = self.expected_message.as_ref() {
950                    if message.from != expected_message.from
951                        || message.content != expected_message.content
952                        || message.metadata.is_writing != expected_message.metadata.is_writing
953                        || message.metadata.created_at != expected_message.metadata.created_at
954                    {
955                        log!("Unexpected message to put delta in. Stopping.");
956                        return true;
957                    }
958                }
959
960                message.set_content(content);
961
962                let index = messages.read().messages.len() - 1;
963                let mut tasks = vec![ChatTask::UpdateMessage(index, message.clone())];
964
965                // Stick the chat to the bottom if the user didn't manually scroll.
966                if !self.user_scrolled_during_stream {
967                    tasks.push(ChatTask::ScrollToBottom(true));
968                }
969
970                self.dispatch(cx, &mut tasks);
971
972                let Some(ChatTask::UpdateMessage(_, updated_message)) = tasks.into_iter().next()
973                else {
974                    // Let's abort if the tasks were modified in an unexpected way.
975                    return true;
976                };
977
978                if !updated_message.content.tool_calls.is_empty() {
979                    // Mark message as not writing since tool calls are complete
980                    let mut final_message = updated_message.clone();
981                    final_message.metadata.is_writing = false;
982                    self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(index, final_message)]);
983                    // TODO: We might want to dispatch a ChatTask::RequestToolPermission(index) here
984                    // Check if dangerous mode is enabled to auto-approve tool calls
985                    let dangerous_mode_enabled = self
986                        .bot_context
987                        .as_ref()
988                        .map(|ctx| {
989                            ctx.tool_manager()
990                                .map(|tm| tm.get_dangerous_mode_enabled())
991                                .unwrap_or(false)
992                        })
993                        .unwrap_or(false);
994
995                    if dangerous_mode_enabled {
996                        // Auto-approve tool calls in dangerous mode
997                        self.dispatch(cx, &mut vec![ChatTask::ApproveToolCalls(index)]);
998                    }
999
1000                    // Signal to stop the current stream since we're switching to tool execution
1001                    return true;
1002                }
1003
1004                self.expected_message = Some(updated_message);
1005
1006                false
1007            }
1008            Err(errors) => {
1009                let mut tasks = errors
1010                    .into_iter()
1011                    .enumerate()
1012                    .map(|(i, e)| {
1013                        ChatTask::InsertMessage(
1014                            messages.read().messages.len() + i,
1015                            Message {
1016                                from: EntityId::App,
1017                                content: MessageContent {
1018                                    text: e.to_string(),
1019                                    ..Default::default()
1020                                },
1021                                ..Default::default()
1022                            },
1023                        )
1024                    })
1025                    .collect::<Vec<_>>();
1026
1027                // Stick the chat to the bottom if the user didn't manually scroll.
1028                if !self.user_scrolled_during_stream {
1029                    tasks.push(ChatTask::ScrollToBottom(true));
1030                }
1031
1032                self.dispatch(cx, &mut tasks);
1033                true
1034            }
1035        }
1036    }
1037
1038    /// Returns true if the chat is currently streaming.
1039    pub fn is_streaming(&self) -> bool {
1040        if let Some(message) = self.messages_ref().read().messages.last() {
1041            message.metadata.is_writing
1042        } else {
1043            false
1044        }
1045    }
1046
1047    pub fn set_bot_id(&mut self, cx: &mut Cx, bot_id: Option<BotId>) {
1048        self.bot_id = bot_id;
1049        self.handle_capabilities(cx);
1050    }
1051
1052    pub fn bot_id(&self) -> Option<&BotId> {
1053        self.bot_id.as_ref()
1054    }
1055
1056    pub fn set_bot_context(&mut self, cx: &mut Cx, bot_context: Option<BotContext>) {
1057        self.bot_context = bot_context;
1058        self.handle_capabilities(cx);
1059    }
1060
1061    pub fn bot_context(&self) -> Option<&BotContext> {
1062        self.bot_context.as_ref()
1063    }
1064}
1065
1066// TODO: Since `ChatRef` is generated by a macro, I can't document this to give
1067// these functions better visibility from the module view.
1068impl ChatRef {
1069    /// Immutable access to the underlying [Chat].
1070    ///
1071    /// Panics if the widget reference is empty or if it's already borrowed.
1072    pub fn read(&self) -> Ref<'_, Chat> {
1073        self.borrow().unwrap()
1074    }
1075
1076    /// Mutable access to the underlying [Chat].
1077    ///
1078    /// Panics if the widget reference is empty or if it's already borrowed.
1079    pub fn write(&mut self) -> RefMut<'_, Chat> {
1080        self.borrow_mut().unwrap()
1081    }
1082
1083    /// Immutable reader to the underlying [Chat].
1084    ///
1085    /// Panics if the widget reference is empty or if it's already borrowed.
1086    pub fn read_with<R>(&self, f: impl FnOnce(&Chat) -> R) -> R {
1087        f(&*self.read())
1088    }
1089
1090    /// Mutable writer to the underlying [Chat].
1091    ///
1092    /// Panics if the widget reference is empty or if it's already borrowed.
1093    pub fn write_with<R>(&mut self, f: impl FnOnce(&mut Chat) -> R) -> R {
1094        f(&mut *self.write())
1095    }
1096}
1097
1098/// Util that wraps the stream of `send()` and gives you a stream less agresive to
1099/// the receiver UI regardless of the streaming chunk size.
1100fn amortize(
1101    input: impl PlatformSendStream<Item = ClientResult<MessageContent>> + 'static,
1102) -> impl PlatformSendStream<Item = ClientResult<MessageContent>> + 'static {
1103    // Use utils
1104    use crate::utils::string::AmortizedString;
1105    use async_stream::stream;
1106
1107    // Stream state
1108    let mut amortized_text = AmortizedString::default();
1109    let mut amortized_reasoning = AmortizedString::default();
1110
1111    // Stream compute
1112    stream! {
1113        // Our wrapper stream "activates" when something comes from the underlying stream.
1114        for await result in input {
1115            // Transparently yield the result on error and then stop.
1116            if result.has_errors() {
1117                yield result;
1118                return;
1119            }
1120
1121            // Modified content that we will be yielding.
1122            let mut content = result.into_value().unwrap();
1123
1124            // Feed the whole string into the string amortizer.
1125            // Put back what has been already amortized from previous iterations.
1126            let text = std::mem::take(&mut content.text);
1127            amortized_text.update(text);
1128            content.text = amortized_text.current().to_string();
1129
1130            // Same for reasoning.
1131            let reasoning = std::mem::take(&mut content.reasoning);
1132            amortized_reasoning.update(reasoning);
1133            content.reasoning = amortized_reasoning.current().to_string();
1134
1135            // Prioritize yielding amortized reasoning updates first.
1136            for reasoning in &mut amortized_reasoning {
1137                content.reasoning = reasoning;
1138                yield ClientResult::new_ok(content.clone());
1139            }
1140
1141            // Finially, begin yielding amortized text updates.
1142            // This will also include the amortized reasoning until now because we
1143            // fed it back into the content.
1144            for text in &mut amortized_text {
1145                content.text = text;
1146                yield ClientResult::new_ok(content.clone());
1147            }
1148        }
1149    }
1150}