1use futures::{StreamExt, stream::AbortHandle};
2use makepad_widgets::*;
3use std::cell::{Ref, RefMut};
4use utils::asynchronous::spawn;
5
6use crate::utils::asynchronous::PlatformSendStream;
7use crate::utils::makepad::EventExt;
8use crate::utils::ui_runner::DeferWithRedrawAsync;
9use crate::widgets::moly_modal::MolyModalWidgetExt;
10
11use crate::mcp::mcp_manager::display_name_from_namespaced;
12use crate::*;
13
14live_design!(
15 use link::theme::*;
16 use link::widgets::*;
17 use link::moly_kit_theme::*;
18 use link::shaders::*;
19
20 use crate::widgets::messages::*;
21 use crate::widgets::prompt_input::*;
22 use crate::widgets::moly_modal::*;
23 use crate::widgets::realtime::*;
24
25 pub Chat = {{Chat}} <RoundedView> {
26 flow: Down,
27 messages = <Messages> {}
28 prompt = <PromptInput> {}
29
30 <View> {
31 width: Fill, height: Fit
32 flow: Overlay
33
34 audio_modal = <MolyModal> {
35 dismiss_on_focus_lost: false
36 content: <RealtimeContent> {}
37 }
38 }
39 }
40);
41
42#[derive(Debug, Clone, PartialEq)]
52pub enum ChatTask {
53 Send,
55
56 Stop,
58
59 CopyMessage(usize),
61
62 SetMessages(Vec<Message>),
64
65 InsertMessage(usize, Message),
67
68 DeleteMessage(usize),
70
71 UpdateMessage(usize, Message),
73
74 ClearPrompt,
76
77 ScrollToBottom(bool),
81
82 ApproveToolCalls(usize),
84
85 DenyToolCalls(usize),
87}
88
89impl From<ChatTask> for Vec<ChatTask> {
90 fn from(task: ChatTask) -> Self {
91 vec![task]
92 }
93}
94
95#[derive(Live, LiveHook, Widget)]
97pub struct Chat {
98 #[deref]
99 deref: View,
100
101 #[rust]
103 bot_context: Option<BotContext>,
104
105 #[rust]
109 bot_id: Option<BotId>,
110
111 #[live(true)]
114 pub stream: bool,
115
116 #[rust]
117 abort_handle: Option<AbortHandle>,
118
119 #[rust]
124 expected_message: Option<Message>,
125
126 #[rust]
127 hook_before: Option<Box<dyn FnMut(&mut Vec<ChatTask>, &mut Chat, &mut Cx)>>,
128
129 #[rust]
130 hook_after: Option<Box<dyn FnMut(&[ChatTask], &mut Chat, &mut Cx)>>,
131
132 #[rust]
133 is_hooking: bool,
134
135 #[rust]
137 user_scrolled_during_stream: bool,
138
139 #[rust]
141 pending_tasks: Vec<ChatTask>,
142}
143
144impl Widget for Chat {
145 fn handle_event(&mut self, cx: &mut Cx, event: &Event, scope: &mut Scope) {
146 self.messages_ref().write_with(|m| {
148 if m.bot_context != self.bot_context {
149 m.bot_context = self.bot_context.clone();
150 }
151 });
152
153 self.ui_runner().handle(cx, event, scope, self);
154 self.deref.handle_event(cx, event, scope);
155 self.handle_messages(cx, event);
156 self.handle_prompt_input(cx, event);
157 self.handle_realtime(cx);
158 self.handle_modal_dismissal(cx, event);
159 self.handle_scrolling();
160 }
161
162 fn draw_walk(&mut self, cx: &mut Cx2d, scope: &mut Scope, walk: Walk) -> DrawStep {
163 self.deref.draw_walk(cx, scope, walk)
164 }
165}
166
167impl Chat {
168 pub fn prompt_input_ref(&self) -> PromptInputRef {
170 self.prompt_input(id!(prompt))
171 }
172
173 pub fn messages_ref(&self) -> MessagesRef {
175 self.messages(id!(messages))
176 }
177
178 fn handle_prompt_input(&mut self, cx: &mut Cx, event: &Event) {
179 if self.prompt_input_ref().read().submitted(event.actions()) {
180 self.handle_submit(cx);
181 }
182
183 if self.prompt_input_ref().read().call_pressed(event.actions()) {
184 self.handle_call(cx);
185 }
186 }
187
188 fn handle_realtime(&mut self, cx: &mut Cx) {
189 if self.realtime(id!(realtime)).connection_requested() {
190 self.dispatch(cx, &mut ChatTask::Send.into());
191 }
192 }
193
194 fn handle_modal_dismissal(&mut self, cx: &mut Cx, event: &Event) {
195 for action in event.actions() {
197 if let RealtimeModalAction::DismissModal = action.cast() {
198 self.moly_modal(id!(audio_modal)).close(cx);
199 }
200 }
201
202 if self.moly_modal(id!(audio_modal)).dismissed(event.actions()) {
204 let mut conversation_messages =
206 self.realtime(id!(realtime)).take_conversation_messages();
207
208 self.realtime(id!(realtime)).reset_state(cx);
210
211 if !conversation_messages.is_empty() {
213 let mut all_messages = self.messages_ref().read().messages.clone();
215
216 let system_message = Message {
219 from: EntityId::App,
220 content: MessageContent {
221 text: "Voice call started.".to_string(),
222 ..Default::default()
223 },
224 ..Default::default()
225 };
226 conversation_messages.insert(0, system_message);
227
228 let system_message = Message {
229 from: EntityId::App,
230 content: MessageContent {
231 text: "Voice call ended.".to_string(),
232 ..Default::default()
233 },
234 ..Default::default()
235 };
236 conversation_messages.push(system_message);
237
238 all_messages.extend(conversation_messages);
239 self.dispatch(
240 cx,
241 &mut vec![
242 ChatTask::SetMessages(all_messages),
243 ChatTask::ScrollToBottom(true),
244 ],
245 );
246 }
247 }
248 }
249
250 fn handle_capabilities(&mut self, cx: &mut Cx) {
251 if let (Some(bot_context), Some(bot_id)) = (&self.bot_context, &self.bot_id) {
252 if let Some(bot) = bot_context.get_bot(bot_id) {
253 self.prompt_input_ref()
254 .write()
255 .set_bot_capabilities(cx, Some(bot.capabilities.clone()));
256 } else if self.bot_id.is_none() {
257 self.prompt_input_ref()
258 .write()
259 .set_bot_capabilities(cx, None);
260 }
261 }
262 }
263
264 fn handle_scrolling(&mut self) {
265 if self.expected_message.is_some() {
267 self.user_scrolled_during_stream = self.messages_ref().read().user_scrolled();
268 }
269 }
270
271 fn handle_messages(&mut self, cx: &mut Cx, event: &Event) {
272 for action in event.actions() {
273 let Some(action) = action.as_widget_action() else {
274 continue;
275 };
276
277 if action.widget_uid != self.messages_ref().widget_uid() {
278 continue;
279 }
280
281 match action.cast::<MessagesAction>() {
282 MessagesAction::Delete(index) => {
283 self.dispatch(cx, &mut ChatTask::DeleteMessage(index).into());
284 }
285 MessagesAction::Copy(index) => {
286 self.dispatch(cx, &mut ChatTask::CopyMessage(index).into());
287 }
288 MessagesAction::EditSave(index) => {
289 let mut tasks = self.messages_ref().read_with(|m| {
290 let mut message = m.messages[index].clone();
291 message.update_content(|content| {
292 content.text = m.current_editor_text().expect("no editor text");
293 });
294 ChatTask::UpdateMessage(index, message).into()
295 });
296
297 self.dispatch(cx, &mut tasks);
298 }
299 MessagesAction::EditRegenerate(index) => {
300 let mut tasks = self.messages_ref().read_with(|m| {
301 let mut messages = m.messages[0..=index].to_vec();
302
303 let index = m.current_editor_index().expect("no editor index");
304 let text = m.current_editor_text().expect("no editor text");
305
306 messages[index].update_content(|content| {
307 content.text = text;
308 });
309
310 vec![ChatTask::SetMessages(messages), ChatTask::Send]
311 });
312
313 self.dispatch(cx, &mut tasks);
314 }
315 MessagesAction::ToolApprove(index) => {
316 self.dispatch(cx, &mut ChatTask::ApproveToolCalls(index).into());
317 }
318 MessagesAction::ToolDeny(index) => {
319 self.dispatch(cx, &mut ChatTask::DenyToolCalls(index).into());
320 }
321 MessagesAction::None => {}
322 }
323 }
324 }
325
326 fn handle_submit(&mut self, cx: &mut Cx) {
327 let prompt = self.prompt_input_ref();
328
329 if prompt.read().has_send_task() {
330 let next_index = self.messages_ref().read().messages.len();
331 let text = prompt.text();
332 let attachments = prompt
333 .read()
334 .attachment_list_ref()
335 .read()
336 .attachments
337 .clone();
338 let mut composition = Vec::new();
339
340 if !text.is_empty() || !attachments.is_empty() {
341 composition.push(ChatTask::InsertMessage(
342 next_index,
343 Message {
344 from: EntityId::User,
345 content: MessageContent {
346 text,
347 attachments,
348 ..Default::default()
349 },
350 ..Default::default()
351 },
352 ));
353 }
354
355 composition.extend([ChatTask::Send, ChatTask::ClearPrompt]);
356
357 self.dispatch(cx, &mut composition);
358 } else if prompt.read().has_stop_task() {
359 self.dispatch(cx, &mut ChatTask::Stop.into());
360 }
361 }
362
363 fn handle_call(&mut self, cx: &mut Cx) {
364 self.dispatch(cx, &mut ChatTask::Send.into());
367 }
368
369 fn handle_tool_calls(
370 &mut self,
371 _cx: &mut Cx,
372 tool_calls: Vec<ToolCall>,
373 loading_message_index: usize,
374 ) {
375 let context = self
376 .bot_context
377 .as_ref()
378 .expect("no BotContext provided")
379 .clone();
380
381 let ui = self.ui_runner();
382 let future = async move {
383 let Some(tool_manager) = context.tool_manager() else {
385 ui.defer_with_redraw(move |me, cx, _| {
386 let error_message = Message {
387 from: EntityId::Tool,
388 content: MessageContent {
389 text: "Tool execution failed: Tool manager not available".to_string(),
390 ..Default::default()
391 },
392 metadata: MessageMetadata {
393 is_writing: false,
394 ..MessageMetadata::new()
395 },
396 ..Default::default()
397 };
398 me.dispatch(
399 cx,
400 &mut vec![ChatTask::UpdateMessage(
401 loading_message_index,
402 error_message,
403 )],
404 );
405 });
406 return;
407 };
408
409 let tool_results = tool_manager.execute_tool_calls(tool_calls.clone()).await;
411
412 ui.defer_with_redraw(move |me, cx, _| {
414 let results_text = if tool_results.len() == 1 {
416 let result = &tool_results[0];
417 let tool_name = tool_calls
418 .iter()
419 .find(|tc| tc.id == result.tool_call_id)
420 .map(|tc| tc.name.as_str())
421 .unwrap_or("unknown");
422
423 let display_name = display_name_from_namespaced(tool_name);
424 if result.is_error {
425 format!("🔧 Tool '{}' failed:\n{}", display_name, result.content)
426 } else {
427 let summary = crate::utils::tool_execution::create_tool_output_summary(
428 tool_name,
429 &result.content,
430 );
431 format!(
432 "🔧 Tool '{}' executed successfully:\n`{}`",
433 display_name, summary
434 )
435 }
436 } else {
437 let mut text = format!("🔧 Executed {} tools:\n\n", tool_results.len());
438 for result in &tool_results {
439 let tool_name = tool_calls
440 .iter()
441 .find(|tc| tc.id == result.tool_call_id)
442 .map(|tc| tc.name.as_str())
443 .unwrap_or("unknown");
444
445 let display_name = display_name_from_namespaced(tool_name);
446 if result.is_error {
447 text.push_str(&format!(
448 "**{}** ❌: {}\n\n",
449 display_name, result.content
450 ));
451 } else {
452 let summary = crate::utils::tool_execution::create_tool_output_summary(
453 tool_name,
454 &result.content,
455 );
456 text.push_str(&format!("**{}** ✅: `{}`\n\n", display_name, summary));
457 }
458 }
459 text
460 };
461
462 let updated_message = Message {
464 from: EntityId::Tool, content: MessageContent {
466 text: results_text,
467 tool_results,
468 ..Default::default()
469 },
470 metadata: MessageMetadata {
471 is_writing: false, ..MessageMetadata::new()
473 },
474 ..Default::default()
475 };
476
477 me.dispatch(
478 cx,
479 &mut vec![
480 ChatTask::UpdateMessage(loading_message_index, updated_message),
481 ChatTask::Send, ],
483 );
484 });
485 };
486
487 spawn(future);
488 }
489
490 fn handle_send_task(&mut self, cx: &mut Cx) {
491 self.abort();
493
494 let bot_id = self.bot_id.clone().expect("no bot selected");
496
497 let context = self
498 .bot_context
499 .as_ref()
500 .expect("no BotContext provided")
501 .clone();
502
503 if context.get_bot(&bot_id).is_none() {
505 let next_index = self.messages_ref().read().messages.len();
507 let error_message = format!(
508 "App error: Bot not found. The bot might have been disabled or removed. Bot ID: {}",
509 bot_id
510 );
511
512 let message = Message {
513 from: EntityId::App,
514 content: MessageContent {
515 text: error_message,
516 ..Default::default()
517 },
518 ..Default::default()
519 };
520
521 self.dispatch(cx, &mut vec![ChatTask::InsertMessage(next_index, message)]);
522 return;
523 }
524
525 let messages_history_context: Vec<Message> = self.messages_ref().write_with(|messages| {
526 messages.bot_context = Some(context.clone());
527
528 messages
529 .messages
530 .iter()
531 .filter(|m| m.metadata.is_idle() && m.from != EntityId::App)
532 .cloned()
533 .collect()
534 });
535
536 let bot = context.get_bot(&bot_id).unwrap(); if !bot.capabilities.supports_realtime() {
540 let loading_message = Message {
541 from: EntityId::Bot(bot_id.clone()),
542 metadata: MessageMetadata {
543 is_writing: true,
544 ..MessageMetadata::new()
545 },
546 ..Default::default()
547 };
548
549 let next_index = self.messages_ref().read().messages.len();
550 self.dispatch(
551 cx,
552 &mut vec![ChatTask::InsertMessage(next_index, loading_message)],
553 );
554 }
555
556 self.dispatch(cx, &mut vec![ChatTask::ScrollToBottom(false)]);
557 self.prompt_input_ref().write().set_stop();
558 self.redraw(cx);
559
560 let ui = self.ui_runner();
561 let future = async move {
562 let mut client = context.client();
563 let bot = match context.get_bot(&bot_id) {
564 Some(bot) => bot,
565 None => {
566 let bot_id_clone = bot_id.clone(); ui.defer_with_redraw(move |me, cx, _| {
569 let error_message = format!(
570 "App error: Bot not found during stream initialization. Bot ID: {}",
571 bot_id_clone
572 );
573 let next_index = me.messages_ref().read().messages.len();
574 let message = Message {
575 from: EntityId::App,
576 content: MessageContent {
577 text: error_message,
578 ..Default::default()
579 },
580 ..Default::default()
581 };
582 me.dispatch(cx, &mut vec![ChatTask::InsertMessage(next_index, message)]);
583 });
584 return;
585 }
586 };
587
588 let tools = if let Some(tool_manager) = context.tool_manager() {
589 tool_manager.get_all_namespaced_tools()
590 } else {
591 Vec::new()
592 };
593
594 let message_stream = amortize(client.send(&bot.id, &messages_history_context, &tools));
595 let mut message_stream = std::pin::pin!(message_stream);
596 while let Some(result) = message_stream.next().await {
597 let should_break = ui
607 .defer_with_redraw_async(move |me, cx, _| me.handle_message_delta(cx, result))
608 .await;
609
610 if should_break.unwrap_or(true) {
611 break;
612 }
613 }
614 };
615
616 let (future, abort_handle) = futures::future::abortable(future);
617 self.abort_handle = Some(abort_handle);
618
619 spawn(async move {
620 if future.await.is_ok() {
627 ui.defer_with_redraw(|me, _, _| me.clean_streaming_artifacts());
628 }
629 });
630 }
631
632 fn handle_stop_task(&mut self, cx: &mut Cx) {
633 self.abort();
634 self.redraw(cx);
635 }
636
637 fn abort(&mut self) {
640 self.abort_handle.take().map(|handle| handle.abort());
641 self.clean_streaming_artifacts();
642 }
643
644 pub fn dispatch(&mut self, cx: &mut Cx, tasks: &mut Vec<ChatTask>) {
655 if self.is_hooking {
657 self.pending_tasks.extend(tasks.iter().cloned());
658 return;
659 }
660
661 self.is_hooking = true;
662
663 if let Some(mut hook) = self.hook_before.take() {
664 hook(tasks, self, cx);
665 self.hook_before = Some(hook);
666 }
667
668 for task in tasks.iter() {
669 self.handle_task(cx, task);
670 }
671
672 if let Some(mut hook) = self.hook_after.take() {
673 hook(tasks, self, cx);
674 self.hook_after = Some(hook);
675 }
676
677 self.is_hooking = false;
678
679 if !self.pending_tasks.is_empty() {
681 let mut pending = std::mem::take(&mut self.pending_tasks);
682 self.dispatch(cx, &mut pending);
683 }
684 }
685
686 pub fn perform(&mut self, cx: &mut Cx, tasks: &[ChatTask]) {
690 for task in tasks {
691 self.handle_task(cx, &task);
692 }
693 }
694
695 fn handle_task(&mut self, cx: &mut Cx, task: &ChatTask) {
696 match task {
697 ChatTask::CopyMessage(index) => {
698 self.messages_ref().read_with(|m| {
699 let text = &m.messages[*index].content.text;
700 cx.copy_to_clipboard(text);
701 });
702 }
703 ChatTask::DeleteMessage(index) => {
704 self.messages_ref().write().messages.remove(*index);
705 self.redraw(cx);
706 }
707 ChatTask::InsertMessage(index, message) => {
708 self.messages_ref()
709 .write()
710 .messages
711 .insert(*index, message.clone());
712 self.redraw(cx);
713 }
714 ChatTask::Send => {
715 self.handle_send_task(cx);
716 }
717 ChatTask::Stop => {
718 self.handle_stop_task(cx);
719 }
720 ChatTask::UpdateMessage(index, message) => {
721 self.messages_ref().write_with(|m| {
722 let new_message = message.clone();
723 let old_message = m.messages.get_mut(*index).expect("no message at index");
724
725 *old_message = new_message;
726 m.set_message_editor_visibility(*index, false);
727 });
728
729 self.redraw(cx);
730 }
731 ChatTask::SetMessages(messages) => {
732 self.messages_ref().write_with(|m| {
733 m.messages = messages.clone();
734
735 if let Some(index) = m.current_editor_index() {
736 m.set_message_editor_visibility(index, false);
737 }
738 });
739
740 self.redraw(cx);
741 }
742 ChatTask::ClearPrompt => {
743 self.prompt_input_ref().write().reset(cx);
744 }
745 ChatTask::ScrollToBottom(triggered_by_stream) => {
746 self.messages_ref()
747 .write()
748 .scroll_to_bottom(cx, *triggered_by_stream);
749 }
750 ChatTask::ApproveToolCalls(index) => {
751 let mut message_updated = None;
753 let tool_calls = self.messages_ref().write_with(|m| {
754 if let Some(message) = m.messages.get_mut(*index) {
755 message.update_content(|content| {
756 for tool_call in &mut content.tool_calls {
757 tool_call.permission_status = ToolCallPermissionStatus::Approved;
758 }
759 });
760 message_updated = Some(message.clone());
761 message.content.tool_calls.clone()
762 } else {
763 Vec::new()
764 }
765 });
766
767 if let Some(message) = message_updated {
768 self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(*index, message)]);
769 }
770
771 if !tool_calls.is_empty() {
772 let next_index = self.messages_ref().read().messages.len();
774 let loading_text = if tool_calls.len() == 1 {
775 format!(
776 "Executing tool '{}'...",
777 display_name_from_namespaced(&tool_calls[0].name)
778 )
779 } else {
780 format!("Executing {} tools...", tool_calls.len())
781 };
782
783 let loading_message = Message {
784 from: EntityId::Tool,
785 content: MessageContent {
786 text: loading_text,
787 ..Default::default()
788 },
789 metadata: MessageMetadata {
790 is_writing: true,
791 ..MessageMetadata::new()
792 },
793 ..Default::default()
794 };
795
796 self.dispatch(
797 cx,
798 &mut vec![ChatTask::InsertMessage(next_index, loading_message)],
799 );
800
801 self.handle_tool_calls(cx, tool_calls, next_index);
802 } else {
803 ::log::error!("No tool calls found at index: {}", index);
804 }
805
806 self.redraw(cx);
807 }
808 ChatTask::DenyToolCalls(index) => {
809 let mut message_updated = None;
811 let tool_calls = self.messages_ref().write_with(|m| {
812 if let Some(message) = m.messages.get_mut(*index) {
813 message.update_content(|content| {
814 for tool_call in &mut content.tool_calls {
815 tool_call.permission_status = ToolCallPermissionStatus::Denied;
816 }
817 });
818 message_updated = Some(message.clone());
819 message.content.tool_calls.clone()
820 } else {
821 Vec::new()
822 }
823 });
824
825 if let Some(message) = message_updated {
826 self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(*index, message)]);
827 }
828
829 if !tool_calls.is_empty() {
830 let tool_results: Vec<ToolResult> = tool_calls.iter().map(|tc| {
832 let display_name = display_name_from_namespaced(&tc.name);
833 ToolResult {
834 tool_call_id: tc.id.clone(),
835 content: format!("Tool execution was denied by the user. Tool '{}' was not executed.", display_name),
836 is_error: true,
837 }
838 }).collect();
839
840 let next_index = self.messages_ref().read().messages.len();
841
842 let tool_message = Message {
844 from: EntityId::Tool,
845 content: MessageContent {
846 text: "🚫 Tool execution was denied by the user.".to_string(),
847 tool_results,
848 ..Default::default()
849 },
850 ..Default::default()
851 };
852
853 self.dispatch(
855 cx,
856 &mut vec![
857 ChatTask::InsertMessage(next_index, tool_message),
858 ChatTask::Send,
859 ],
860 );
861 }
862
863 self.redraw(cx);
864 }
865 }
866 }
867
868 pub fn set_hook_before(
875 &mut self,
876 hook: impl FnMut(&mut Vec<ChatTask>, &mut Chat, &mut Cx) + 'static,
877 ) {
878 if self.is_hooking {
879 panic!("Cannot set a hook while hooking");
880 }
881
882 self.hook_before = Some(Box::new(hook));
883 }
884
885 pub fn set_hook_after(&mut self, hook: impl FnMut(&[ChatTask], &mut Chat, &mut Cx) + 'static) {
889 if self.is_hooking {
890 panic!("Cannot set a hook while hooking");
891 }
892
893 self.hook_after = Some(Box::new(hook));
894 }
895
896 fn clean_streaming_artifacts(&mut self) {
901 self.abort_handle = None;
902 self.expected_message = None;
903 self.user_scrolled_during_stream = false;
904 self.messages_ref().write().reset_scroll_state();
905 self.prompt_input_ref().write().set_send();
906 self.messages_ref().write().messages.retain_mut(|m| {
907 m.metadata.is_writing = false;
908 !m.content.is_empty()
909 });
910 }
911
912 fn handle_message_delta(&mut self, cx: &mut Cx, result: ClientResult<MessageContent>) -> bool {
916 let messages = self.messages_ref();
917
918 match result.into_result() {
921 Ok(content) => {
922 if let Some(Upgrade::Realtime(channel)) = &content.upgrade {
924 self.clean_streaming_artifacts();
926
927 let mut realtime = self.realtime(id!(realtime));
929 realtime.set_bot_entity_id(
930 cx,
931 EntityId::Bot(self.bot_id.clone().unwrap_or_default()),
932 );
933 realtime.set_realtime_channel(channel.clone());
934 realtime.set_bot_context(self.bot_context.clone());
935
936 let modal = self.moly_modal(id!(audio_modal));
937 modal.open(cx);
938
939 return true;
941 }
942
943 let Some(mut message) = messages.read().messages.last().cloned() else {
945 return true;
946 };
947
948 if let Some(expected_message) = self.expected_message.as_ref() {
950 if message.from != expected_message.from
951 || message.content != expected_message.content
952 || message.metadata.is_writing != expected_message.metadata.is_writing
953 || message.metadata.created_at != expected_message.metadata.created_at
954 {
955 log!("Unexpected message to put delta in. Stopping.");
956 return true;
957 }
958 }
959
960 message.set_content(content);
961
962 let index = messages.read().messages.len() - 1;
963 let mut tasks = vec![ChatTask::UpdateMessage(index, message.clone())];
964
965 if !self.user_scrolled_during_stream {
967 tasks.push(ChatTask::ScrollToBottom(true));
968 }
969
970 self.dispatch(cx, &mut tasks);
971
972 let Some(ChatTask::UpdateMessage(_, updated_message)) = tasks.into_iter().next()
973 else {
974 return true;
976 };
977
978 if !updated_message.content.tool_calls.is_empty() {
979 let mut final_message = updated_message.clone();
981 final_message.metadata.is_writing = false;
982 self.dispatch(cx, &mut vec![ChatTask::UpdateMessage(index, final_message)]);
983 let dangerous_mode_enabled = self
986 .bot_context
987 .as_ref()
988 .map(|ctx| {
989 ctx.tool_manager()
990 .map(|tm| tm.get_dangerous_mode_enabled())
991 .unwrap_or(false)
992 })
993 .unwrap_or(false);
994
995 if dangerous_mode_enabled {
996 self.dispatch(cx, &mut vec![ChatTask::ApproveToolCalls(index)]);
998 }
999
1000 return true;
1002 }
1003
1004 self.expected_message = Some(updated_message);
1005
1006 false
1007 }
1008 Err(errors) => {
1009 let mut tasks = errors
1010 .into_iter()
1011 .enumerate()
1012 .map(|(i, e)| {
1013 ChatTask::InsertMessage(
1014 messages.read().messages.len() + i,
1015 Message {
1016 from: EntityId::App,
1017 content: MessageContent {
1018 text: e.to_string(),
1019 ..Default::default()
1020 },
1021 ..Default::default()
1022 },
1023 )
1024 })
1025 .collect::<Vec<_>>();
1026
1027 if !self.user_scrolled_during_stream {
1029 tasks.push(ChatTask::ScrollToBottom(true));
1030 }
1031
1032 self.dispatch(cx, &mut tasks);
1033 true
1034 }
1035 }
1036 }
1037
1038 pub fn is_streaming(&self) -> bool {
1040 if let Some(message) = self.messages_ref().read().messages.last() {
1041 message.metadata.is_writing
1042 } else {
1043 false
1044 }
1045 }
1046
1047 pub fn set_bot_id(&mut self, cx: &mut Cx, bot_id: Option<BotId>) {
1048 self.bot_id = bot_id;
1049 self.handle_capabilities(cx);
1050 }
1051
1052 pub fn bot_id(&self) -> Option<&BotId> {
1053 self.bot_id.as_ref()
1054 }
1055
1056 pub fn set_bot_context(&mut self, cx: &mut Cx, bot_context: Option<BotContext>) {
1057 self.bot_context = bot_context;
1058 self.handle_capabilities(cx);
1059 }
1060
1061 pub fn bot_context(&self) -> Option<&BotContext> {
1062 self.bot_context.as_ref()
1063 }
1064}
1065
1066impl ChatRef {
1069 pub fn read(&self) -> Ref<'_, Chat> {
1073 self.borrow().unwrap()
1074 }
1075
1076 pub fn write(&mut self) -> RefMut<'_, Chat> {
1080 self.borrow_mut().unwrap()
1081 }
1082
1083 pub fn read_with<R>(&self, f: impl FnOnce(&Chat) -> R) -> R {
1087 f(&*self.read())
1088 }
1089
1090 pub fn write_with<R>(&mut self, f: impl FnOnce(&mut Chat) -> R) -> R {
1094 f(&mut *self.write())
1095 }
1096}
1097
1098fn amortize(
1101 input: impl PlatformSendStream<Item = ClientResult<MessageContent>> + 'static,
1102) -> impl PlatformSendStream<Item = ClientResult<MessageContent>> + 'static {
1103 use crate::utils::string::AmortizedString;
1105 use async_stream::stream;
1106
1107 let mut amortized_text = AmortizedString::default();
1109 let mut amortized_reasoning = AmortizedString::default();
1110
1111 stream! {
1113 for await result in input {
1115 if result.has_errors() {
1117 yield result;
1118 return;
1119 }
1120
1121 let mut content = result.into_value().unwrap();
1123
1124 let text = std::mem::take(&mut content.text);
1127 amortized_text.update(text);
1128 content.text = amortized_text.current().to_string();
1129
1130 let reasoning = std::mem::take(&mut content.reasoning);
1132 amortized_reasoning.update(reasoning);
1133 content.reasoning = amortized_reasoning.current().to_string();
1134
1135 for reasoning in &mut amortized_reasoning {
1137 content.reasoning = reasoning;
1138 yield ClientResult::new_ok(content.clone());
1139 }
1140
1141 for text in &mut amortized_text {
1145 content.text = text;
1146 yield ClientResult::new_ok(content.clone());
1147 }
1148 }
1149 }
1150}