| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from __future__ import annotations
- import time
- from typing import AsyncIterator, Dict, Iterator, Optional
- import httpx
- from .api.default import (
- app_agents,
- command_list,
- config_get,
- config_providers,
- file_status,
- path_get,
- project_current,
- project_list,
- session_list,
- tool_ids,
- )
- from .client import Client
- from .types import UNSET, Unset
- class OpenCodeClient:
- """High-level convenience wrapper around the generated Client.
- Provides sensible defaults and a couple of helper methods, with optional retries.
- """
- def __init__(
- self,
- base_url: str = "http://localhost:4096",
- *,
- headers: Optional[Dict[str, str]] = None,
- timeout: Optional[float] = None,
- verify_ssl: bool | str | httpx.URLTypes | None = True,
- token: Optional[str] = None,
- auth_header_name: str = "Authorization",
- auth_prefix: str = "Bearer",
- retries: int = 0,
- backoff_factor: float = 0.5,
- status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504),
- ) -> None:
- httpx_timeout = None if timeout is None else httpx.Timeout(timeout)
- all_headers = dict(headers or {})
- if token:
- all_headers[auth_header_name] = f"{auth_prefix} {token}".strip()
- self._client = Client(
- base_url=base_url,
- headers=all_headers,
- timeout=httpx_timeout,
- verify_ssl=verify_ssl if isinstance(verify_ssl, bool) else True,
- )
- self._retries = max(0, int(retries))
- self._backoff = float(backoff_factor)
- self._status_forcelist = set(status_forcelist)
- @property
- def client(self) -> Client:
- return self._client
- # ---- Internal retry helper ----
- def _call_with_retries(self, fn, *args, **kwargs):
- attempt = 0
- while True:
- try:
- return fn(*args, **kwargs)
- except httpx.RequestError:
- pass
- except httpx.HTTPStatusError as e:
- if e.response is None or e.response.status_code not in self._status_forcelist:
- raise
- if attempt >= self._retries:
- # re-raise last exception if we have one
- raise
- sleep = self._backoff * (2**attempt)
- time.sleep(sleep)
- attempt += 1
- # ---- Convenience wrappers over generated endpoints ----
- def list_sessions(self, *, directory: str | Unset = UNSET):
- """Return sessions in the current project.
- Wraps GET /session. Pass `directory` to target a specific project/directory if needed.
- """
- return self._call_with_retries(session_list.sync, client=self._client, directory=directory)
- def get_config(self, *, directory: str | Unset = UNSET):
- """Return opencode configuration for the current project (GET /config)."""
- return self._call_with_retries(config_get.sync, client=self._client, directory=directory)
- def list_agents(self, *, directory: str | Unset = UNSET):
- """List configured agents (GET /agent)."""
- return self._call_with_retries(app_agents.sync, client=self._client, directory=directory)
- def list_projects(self, *, directory: str | Unset = UNSET):
- """List known projects (GET /project)."""
- return self._call_with_retries(project_list.sync, client=self._client, directory=directory)
- def current_project(self, *, directory: str | Unset = UNSET):
- """Return current project (GET /project/current)."""
- return self._call_with_retries(project_current.sync, client=self._client, directory=directory)
- def file_status(self, *, directory: str | Unset = UNSET):
- """Return file status list (GET /file/status)."""
- return self._call_with_retries(file_status.sync, client=self._client, directory=directory)
- def get_path(self, *, directory: str | Unset = UNSET):
- """Return opencode path info (GET /path)."""
- return self._call_with_retries(path_get.sync, client=self._client, directory=directory)
- def config_providers(self, *, directory: str | Unset = UNSET):
- """Return configured providers (GET /config/providers)."""
- return self._call_with_retries(config_providers.sync, client=self._client, directory=directory)
- def tool_ids(self, *, directory: str | Unset = UNSET):
- """Return tool identifiers for a provider/model pair (GET /experimental/tool)."""
- return self._call_with_retries(tool_ids.sync, client=self._client, directory=directory)
- def list_commands(self, *, directory: str | Unset = UNSET):
- """List commands (GET /command)."""
- return self._call_with_retries(command_list.sync, client=self._client, directory=directory)
- # ---- Server-Sent Events (SSE) streaming ----
- def subscribe_events(self, *, directory: str | Unset = UNSET) -> Iterator[dict]:
- """Subscribe to /event SSE endpoint and yield parsed JSON events.
- This is a blocking generator which yields one event dict per message.
- """
- client = self._client.get_httpx_client()
- params: dict[str, str] = {}
- if directory is not UNSET and directory is not None:
- params["directory"] = str(directory)
- with client.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
- r.raise_for_status()
- buf = ""
- for line_bytes in r.iter_lines():
- line = line_bytes.decode("utf-8") if isinstance(line_bytes, (bytes, bytearray)) else str(line_bytes)
- if line.startswith(":"):
- # comment/heartbeat
- continue
- if line == "":
- if buf:
- # end of event
- for part in buf.split("\n"):
- if part.startswith("data:"):
- data = part[5:].strip()
- if data:
- try:
- yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined]
- except Exception:
- # fall back: skip malformed
- pass
- buf = ""
- continue
- buf += line + "\n"
- async def subscribe_events_async(self, *, directory: str | Unset = UNSET) -> AsyncIterator[dict]:
- """Async variant of subscribe_events using httpx.AsyncClient."""
- aclient = self._client.get_async_httpx_client()
- params: dict[str, str] = {}
- if directory is not UNSET and directory is not None:
- params["directory"] = str(directory)
- async with aclient.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
- r.raise_for_status()
- buf = ""
- async for line_bytes in r.aiter_lines():
- line = line_bytes
- if line.startswith(":"):
- continue
- if line == "":
- if buf:
- for part in buf.split("\n"):
- if part.startswith("data:"):
- data = part[5:].strip()
- if data:
- try:
- yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined]
- except Exception:
- pass
- buf = ""
- continue
- buf += line + "\n"
|