extras.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from __future__ import annotations
  2. import time
  3. from typing import AsyncIterator, Dict, Iterator, Optional
  4. import httpx
  5. from .api.default import (
  6. app_agents,
  7. command_list,
  8. config_get,
  9. config_providers,
  10. file_status,
  11. path_get,
  12. project_current,
  13. project_list,
  14. session_list,
  15. tool_ids,
  16. )
  17. from .client import Client
  18. from .types import UNSET, Unset
  19. class OpenCodeClient:
  20. """High-level convenience wrapper around the generated Client.
  21. Provides sensible defaults and a couple of helper methods, with optional retries.
  22. """
  23. def __init__(
  24. self,
  25. base_url: str = "http://localhost:4096",
  26. *,
  27. headers: Optional[Dict[str, str]] = None,
  28. timeout: Optional[float] = None,
  29. verify_ssl: bool | str | httpx.URLTypes | None = True,
  30. token: Optional[str] = None,
  31. auth_header_name: str = "Authorization",
  32. auth_prefix: str = "Bearer",
  33. retries: int = 0,
  34. backoff_factor: float = 0.5,
  35. status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504),
  36. ) -> None:
  37. httpx_timeout = None if timeout is None else httpx.Timeout(timeout)
  38. all_headers = dict(headers or {})
  39. if token:
  40. all_headers[auth_header_name] = f"{auth_prefix} {token}".strip()
  41. self._client = Client(
  42. base_url=base_url,
  43. headers=all_headers,
  44. timeout=httpx_timeout,
  45. verify_ssl=verify_ssl if isinstance(verify_ssl, bool) else True,
  46. )
  47. self._retries = max(0, int(retries))
  48. self._backoff = float(backoff_factor)
  49. self._status_forcelist = set(status_forcelist)
  50. @property
  51. def client(self) -> Client:
  52. return self._client
  53. # ---- Internal retry helper ----
  54. def _call_with_retries(self, fn, *args, **kwargs):
  55. attempt = 0
  56. while True:
  57. try:
  58. return fn(*args, **kwargs)
  59. except httpx.RequestError:
  60. pass
  61. except httpx.HTTPStatusError as e:
  62. if e.response is None or e.response.status_code not in self._status_forcelist:
  63. raise
  64. if attempt >= self._retries:
  65. # re-raise last exception if we have one
  66. raise
  67. sleep = self._backoff * (2**attempt)
  68. time.sleep(sleep)
  69. attempt += 1
  70. # ---- Convenience wrappers over generated endpoints ----
  71. def list_sessions(self, *, directory: str | Unset = UNSET):
  72. """Return sessions in the current project.
  73. Wraps GET /session. Pass `directory` to target a specific project/directory if needed.
  74. """
  75. return self._call_with_retries(session_list.sync, client=self._client, directory=directory)
  76. def get_config(self, *, directory: str | Unset = UNSET):
  77. """Return opencode configuration for the current project (GET /config)."""
  78. return self._call_with_retries(config_get.sync, client=self._client, directory=directory)
  79. def list_agents(self, *, directory: str | Unset = UNSET):
  80. """List configured agents (GET /agent)."""
  81. return self._call_with_retries(app_agents.sync, client=self._client, directory=directory)
  82. def list_projects(self, *, directory: str | Unset = UNSET):
  83. """List known projects (GET /project)."""
  84. return self._call_with_retries(project_list.sync, client=self._client, directory=directory)
  85. def current_project(self, *, directory: str | Unset = UNSET):
  86. """Return current project (GET /project/current)."""
  87. return self._call_with_retries(project_current.sync, client=self._client, directory=directory)
  88. def file_status(self, *, directory: str | Unset = UNSET):
  89. """Return file status list (GET /file/status)."""
  90. return self._call_with_retries(file_status.sync, client=self._client, directory=directory)
  91. def get_path(self, *, directory: str | Unset = UNSET):
  92. """Return opencode path info (GET /path)."""
  93. return self._call_with_retries(path_get.sync, client=self._client, directory=directory)
  94. def config_providers(self, *, directory: str | Unset = UNSET):
  95. """Return configured providers (GET /config/providers)."""
  96. return self._call_with_retries(config_providers.sync, client=self._client, directory=directory)
  97. def tool_ids(self, *, directory: str | Unset = UNSET):
  98. """Return tool identifiers for a provider/model pair (GET /experimental/tool)."""
  99. return self._call_with_retries(tool_ids.sync, client=self._client, directory=directory)
  100. def list_commands(self, *, directory: str | Unset = UNSET):
  101. """List commands (GET /command)."""
  102. return self._call_with_retries(command_list.sync, client=self._client, directory=directory)
  103. # ---- Server-Sent Events (SSE) streaming ----
  104. def subscribe_events(self, *, directory: str | Unset = UNSET) -> Iterator[dict]:
  105. """Subscribe to /event SSE endpoint and yield parsed JSON events.
  106. This is a blocking generator which yields one event dict per message.
  107. """
  108. client = self._client.get_httpx_client()
  109. params: dict[str, str] = {}
  110. if directory is not UNSET and directory is not None:
  111. params["directory"] = str(directory)
  112. with client.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
  113. r.raise_for_status()
  114. buf = ""
  115. for line_bytes in r.iter_lines():
  116. line = line_bytes.decode("utf-8") if isinstance(line_bytes, (bytes, bytearray)) else str(line_bytes)
  117. if line.startswith(":"):
  118. # comment/heartbeat
  119. continue
  120. if line == "":
  121. if buf:
  122. # end of event
  123. for part in buf.split("\n"):
  124. if part.startswith("data:"):
  125. data = part[5:].strip()
  126. if data:
  127. try:
  128. yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined]
  129. except Exception:
  130. # fall back: skip malformed
  131. pass
  132. buf = ""
  133. continue
  134. buf += line + "\n"
  135. async def subscribe_events_async(self, *, directory: str | Unset = UNSET) -> AsyncIterator[dict]:
  136. """Async variant of subscribe_events using httpx.AsyncClient."""
  137. aclient = self._client.get_async_httpx_client()
  138. params: dict[str, str] = {}
  139. if directory is not UNSET and directory is not None:
  140. params["directory"] = str(directory)
  141. async with aclient.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
  142. r.raise_for_status()
  143. buf = ""
  144. async for line_bytes in r.aiter_lines():
  145. line = line_bytes
  146. if line.startswith(":"):
  147. continue
  148. if line == "":
  149. if buf:
  150. for part in buf.split("\n"):
  151. if part.startswith("data:"):
  152. data = part[5:].strip()
  153. if data:
  154. try:
  155. yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined]
  156. except Exception:
  157. pass
  158. buf = ""
  159. continue
  160. buf += line + "\n"