stream.ts 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // Thin bridge between reducer output and the footer API.
  2. //
  3. // The reducers produce StreamCommit[] and an optional FooterOutput (patch +
  4. // view + subagent state). This module forwards them to footer.append() and
  5. // footer.event() respectively, adding trace writes along the way. It also
  6. // defaults status updates to phase "running" if the caller didn't set a
  7. // phase -- a convenience so reducer code doesn't have to repeat that.
  8. import type { FooterApi, FooterOutput, FooterPatch, FooterSubagentState, StreamCommit } from "./types"
  9. type Trace = {
  10. write(type: string, data?: unknown): void
  11. }
  12. type OutputInput = {
  13. footer: FooterApi
  14. trace?: Trace
  15. }
  16. type StreamOutput = {
  17. commits: StreamCommit[]
  18. footer?: FooterOutput
  19. }
  20. // Default to "running" phase when a status string arrives without an explicit phase.
  21. function patch(next: FooterPatch): FooterPatch {
  22. if (typeof next.status === "string" && next.phase === undefined) {
  23. return {
  24. phase: "running",
  25. ...next,
  26. }
  27. }
  28. return next
  29. }
  30. function summarize(value: unknown): unknown {
  31. if (typeof value === "string") {
  32. if (value.length <= 160) {
  33. return value
  34. }
  35. return {
  36. type: "string",
  37. length: value.length,
  38. preview: `${value.slice(0, 160)}...`,
  39. }
  40. }
  41. if (Array.isArray(value)) {
  42. return {
  43. type: "array",
  44. length: value.length,
  45. }
  46. }
  47. if (!value || typeof value !== "object") {
  48. return value
  49. }
  50. return {
  51. type: "object",
  52. keys: Object.keys(value),
  53. }
  54. }
  55. function traceCommit(commit: StreamCommit) {
  56. return {
  57. ...commit,
  58. text: summarize(commit.text),
  59. textLength: commit.text.length,
  60. part: commit.part
  61. ? {
  62. id: commit.part.id,
  63. sessionID: commit.part.sessionID,
  64. messageID: commit.part.messageID,
  65. callID: commit.part.callID,
  66. tool: commit.part.tool,
  67. state: {
  68. status: commit.part.state.status,
  69. title: "title" in commit.part.state ? summarize(commit.part.state.title) : undefined,
  70. error: "error" in commit.part.state ? summarize(commit.part.state.error) : undefined,
  71. time: "time" in commit.part.state ? summarize(commit.part.state.time) : undefined,
  72. input: summarize(commit.part.state.input),
  73. metadata: "metadata" in commit.part.state ? summarize(commit.part.state.metadata) : undefined,
  74. },
  75. }
  76. : undefined,
  77. }
  78. }
  79. export function traceSubagentState(state: FooterSubagentState) {
  80. return {
  81. tabs: state.tabs,
  82. details: Object.fromEntries(
  83. Object.entries(state.details).map(([sessionID, detail]) => [
  84. sessionID,
  85. {
  86. sessionID,
  87. commits: detail.commits.map(traceCommit),
  88. },
  89. ]),
  90. ),
  91. permissions: state.permissions.map((item) => ({
  92. id: item.id,
  93. sessionID: item.sessionID,
  94. permission: item.permission,
  95. patterns: item.patterns,
  96. tool: item.tool,
  97. metadata: item.metadata
  98. ? {
  99. keys: Object.keys(item.metadata),
  100. input: summarize(item.metadata.input),
  101. }
  102. : undefined,
  103. })),
  104. questions: state.questions.map((item) => ({
  105. id: item.id,
  106. sessionID: item.sessionID,
  107. questions: item.questions.map((question) => ({
  108. header: question.header,
  109. question: question.question,
  110. options: question.options.length,
  111. multiple: question.multiple,
  112. })),
  113. })),
  114. }
  115. }
  116. export function traceFooterOutput(footer?: FooterOutput) {
  117. if (!footer?.subagent) {
  118. return footer
  119. }
  120. return {
  121. ...footer,
  122. subagent: traceSubagentState(footer.subagent),
  123. }
  124. }
  125. // Forwards reducer output to the footer: commits go to scrollback, patches update the status bar.
  126. export function writeSessionOutput(input: OutputInput, out: StreamOutput): void {
  127. for (const commit of out.commits) {
  128. input.trace?.write("ui.commit", commit)
  129. input.footer.append(commit)
  130. }
  131. if (out.footer?.patch) {
  132. const next = patch(out.footer.patch)
  133. input.trace?.write("ui.patch", next)
  134. input.footer.event({
  135. type: "stream.patch",
  136. patch: next,
  137. })
  138. }
  139. if (out.footer?.subagent) {
  140. input.trace?.write("ui.subagent", traceSubagentState(out.footer.subagent))
  141. input.footer.event({
  142. type: "stream.subagent",
  143. state: out.footer.subagent,
  144. })
  145. }
  146. if (!out.footer?.view) {
  147. return
  148. }
  149. input.trace?.write("ui.patch", {
  150. view: out.footer.view,
  151. })
  152. input.footer.event({
  153. type: "stream.view",
  154. view: out.footer.view,
  155. })
  156. }