PHO-1805 Reconnect to conversation automatically in client#122
PHO-1805 Reconnect to conversation automatically in client#122elizabeth-phonic wants to merge 10 commits intomainfrom
Conversation
WalkthroughAdds an opt-in reconnect-on-abnormal-disconnect flow for conversation WebSockets (close code 1006). A new flag flows through client wrappers and top-level clients; new websocket helpers and reconnectable socket client implementations centralize STS URL/header building, connection handling, and reconnection/resumption logic. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Phonic
participant ConvClient as Conversations\nClient
participant WSHelper as WebSocket\nHelper
participant Reconn as Reconnectable\nSocketClient
participant WS as WebSocket
User->>Phonic: __init__(reconnect_...=True)
Phonic->>Phonic: store flag on ClientWrapper
User->>ConvClient: connect(...)
ConvClient->>ConvClient: check wrapper._reconnect_conversation_on_abnormal_disconnect
alt reconnect enabled
ConvClient->>WSHelper: open_reconnectable_conversations_socket(...)
WSHelper->>WS: connect to STS /v1/sts/ws
WS-->>WSHelper: connection established
WSHelper->>Reconn: wrap initial conn -> ReconnectableSocket
Reconn-->>ConvClient: return reconnectable socket
User->>Reconn: recv()/__iter__()
Reconn->>WS: receive events
alt abnormal close (1006)
WS-->>Reconn: ConnectionClosed(1006)
Reconn->>Reconn: extract conversation_id
Reconn->>WSHelper: build URL with reconnect_conv_id
WSHelper->>WS: reconnect attempts (backoff)
WS-->>Reconn: new connection -> resume stream
else normal/user close
Reconn-->>User: emit close, stop
end
else reconnect disabled
ConvClient->>WSHelper: open_conversations_socket(...)
WSHelper->>WS: connect to STS
WS-->>ConvClient: return basic socket
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (5)
src/phonic/conversations/raw_client.py (1)
1136-1139: Docstring return type inconsistent with actual return type.The docstring states
ConversationsSocketClientbut the actual return type annotation istyping.Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]. Consider updating the docstring to reflect the union type.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/raw_client.py` around lines 1136 - 1139, Update the docstring return type to match the function's annotated return type: change the Returns section to indicate typing.Union[ConversationsSocketClient, ReconnectableConversationsSocketClient] (or a readable union like "ConversationsSocketClient | ReconnectableConversationsSocketClient") so it aligns with the actual return annotation; reference the types ConversationsSocketClient and ReconnectableConversationsSocketClient and ensure the docstring wording matches the function's signature.src/phonic/conversations/reconnectable_socket_client.py (3)
284-297: Syncrecv()lacks lock protection compared to async version.The async version at lines 147-150 acquires the lock before checking
_user_closedand calling_reconnect(). The sync version here doesn't use any lock, which could lead to race conditions ifclose()is called concurrently:
- Thread A:
recv()starts reconnecting, sleeps at line 294- Thread B:
close()sets_user_closed = Trueand closes connection- Thread A: wakes up, checks
_user_closed(True), but_reconnect()isn't called with any synchronizationConsider adding thread-safe synchronization matching the async version's pattern.
Suggested fix (assuming lock is added to __init__)
def recv(self) -> typing.Any: while True: try: msg = self._inner.recv() self._observe_message(msg) return msg except ConnectionClosed as exc: if not self._should_reconnect(exc): raise self._reconnect_attempts += 1 time.sleep(_reconnect_delay_sec(self._reconnect_attempts)) - if self._user_closed: - raise exc - self._reconnect() + with self._lock: + if self._user_closed: + raise exc + self._reconnect()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 284 - 297, The sync recv() is missing lock protection around the _user_closed check and _reconnect() call; add a thread lock (e.g., self._lock created in __init__) and, inside the except ConnectionClosed block, acquire the lock before checking self._user_closed and invoking self._reconnect(), then release the lock after _reconnect() (or use a context manager) to mirror the async version's synchronization in recv() and prevent races with close().
86-93: Consider logging suppressed exceptions for debugging.The static analysis tool flags the
try-except-passpattern here. While swallowing exceptions during message observation is acceptable (non-critical path), consider adding debug logging to aid troubleshooting.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 86 - 93, The except-pass in _observe_message suppresses errors and loses context; modify the except block in _observe_message to log the caught exception (including stacktrace) before continuing, e.g., call an internal logger like self._logger.exception or self._logger.debug with the exception details and a short message referencing _observe_json, then continue to swallow the exception so behavior is unchanged. Ensure you reference _observe_message and _observe_json when adding the log entry so the log clearly ties to this code path.
100-106: Consider logging suppressed exceptions during connection cleanup.Same as above - swallowing exceptions during cleanup is acceptable, but logging at debug level would help diagnose issues in production.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 100 - 106, In _close_current_connection, avoid silently swallowing exceptions from awaiting self._cm.__aexit__(None, None, None); catch Exception as e and log the exception at debug level so cleanup failures are recorded. Update the except block in _close_current_connection to capture the exception object and call the appropriate logger (e.g., self._logger.debug or module logger.debug) with a short message and the exception info so you still suppress the error but retain diagnostic details about failures during self._cm teardown.src/phonic/client.py (1)
55-57: Minor docstring inconsistency with actual type signature.The docstring indicates
typing.Optional[bool]but the parameter is typed asbool = False. This is a minor inconsistency but could confuse users reading the documentation.Suggested docstring fix
- reconnect_conversation_on_abnormal_disconnect : typing.Optional[bool] + reconnect_conversation_on_abnormal_disconnect : bool When True, the conversations WebSocket automatically reconnects after an abnormal close (code 1006) using reconnect_conv_id. Defaults to False until stable in production.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/client.py` around lines 55 - 57, Docstring for the parameter reconnect_conversation_on_abnormal_disconnect is inconsistent with its signature: update the docstring to declare the type as bool (and note the default is False) to match the parameter signature (reconnect_conversation_on_abnormal_disconnect: bool = False) in the client module, or alternatively change the function signature to typing.Optional[bool] if you intend to allow None; ensure the docstring and the symbol reconnect_conversation_on_abnormal_disconnect are consistent and mention the default behavior (defaults to False).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 198-220: Add a threading lock to the sync reconnectable client to
prevent races: in ReconnectableConversationsSocketClient.__init__ create
self._lock = threading.Lock(), then use this lock to guard the critical section
in close() around the call to self._close_current_connection(), and likewise
guard the reconnect block in recv() so the check of self._user_closed and the
subsequent call to self._reconnect() execute atomically; this mirrors the
asyncio.Lock protection used in the async version and should reference the
methods __init__, close, recv, _close_current_connection, and _reconnect.
In `@src/phonic/conversations/websocket_connect.py`:
- Around line 160-173: The initial async WebSocket connection using
websockets_client_connect and await cm.__aenter__ (variables initial_cm and
initial_protocol) lacks handling for InvalidWebSocketStatus; wrap the await
cm.__aenter__ call in a try/except that catches InvalidWebSocketStatus and
converts it to the same error/handling used by the non-reconnectable path (e.g.,
raise the existing ConversationWebsocketError or log and re-raise with the same
message), ensuring any opened cm is properly closed on failure and that
ReconnectableAsyncConversationsSocketClient is only constructed when the initial
connection succeeds.
- Around line 129-142: The initial WebSocket connect call can raise
InvalidWebSocketStatus and must be converted to an ApiError like the
non-reconnectable open_conversations_socket_sync; wrap the
websockets_sync_client.connect(...) (and the subsequent cm.__enter__()) in a
try/except that catches InvalidWebSocketStatus and raises ApiError with a
helpful message (include status/detail) so callers of
ReconnectableConversationsSocketClient receive the same user-friendly error;
reference the existing symbols cm, protocol, websockets_sync_client.connect,
InvalidWebSocketStatus, ApiError, and ReconnectableConversationsSocketClient
when making the change.
---
Nitpick comments:
In `@src/phonic/client.py`:
- Around line 55-57: Docstring for the parameter
reconnect_conversation_on_abnormal_disconnect is inconsistent with its
signature: update the docstring to declare the type as bool (and note the
default is False) to match the parameter signature
(reconnect_conversation_on_abnormal_disconnect: bool = False) in the client
module, or alternatively change the function signature to typing.Optional[bool]
if you intend to allow None; ensure the docstring and the symbol
reconnect_conversation_on_abnormal_disconnect are consistent and mention the
default behavior (defaults to False).
In `@src/phonic/conversations/raw_client.py`:
- Around line 1136-1139: Update the docstring return type to match the
function's annotated return type: change the Returns section to indicate
typing.Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]
(or a readable union like "ConversationsSocketClient |
ReconnectableConversationsSocketClient") so it aligns with the actual return
annotation; reference the types ConversationsSocketClient and
ReconnectableConversationsSocketClient and ensure the docstring wording matches
the function's signature.
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 284-297: The sync recv() is missing lock protection around the
_user_closed check and _reconnect() call; add a thread lock (e.g., self._lock
created in __init__) and, inside the except ConnectionClosed block, acquire the
lock before checking self._user_closed and invoking self._reconnect(), then
release the lock after _reconnect() (or use a context manager) to mirror the
async version's synchronization in recv() and prevent races with close().
- Around line 86-93: The except-pass in _observe_message suppresses errors and
loses context; modify the except block in _observe_message to log the caught
exception (including stacktrace) before continuing, e.g., call an internal
logger like self._logger.exception or self._logger.debug with the exception
details and a short message referencing _observe_json, then continue to swallow
the exception so behavior is unchanged. Ensure you reference _observe_message
and _observe_json when adding the log entry so the log clearly ties to this code
path.
- Around line 100-106: In _close_current_connection, avoid silently swallowing
exceptions from awaiting self._cm.__aexit__(None, None, None); catch Exception
as e and log the exception at debug level so cleanup failures are recorded.
Update the except block in _close_current_connection to capture the exception
object and call the appropriate logger (e.g., self._logger.debug or module
logger.debug) with a short message and the exception info so you still suppress
the error but retain diagnostic details about failures during self._cm teardown.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3e17ccb7-9cfd-43d0-8211-85be26d3ff90
📒 Files selected for processing (7)
.fernignoresrc/phonic/client.pysrc/phonic/conversations/client.pysrc/phonic/conversations/raw_client.pysrc/phonic/conversations/reconnectable_socket_client.pysrc/phonic/conversations/websocket_connect.pysrc/phonic/core/client_wrapper.py
| class ReconnectableConversationsSocketClient(EventEmitterMixin): | ||
| """Sync conversations socket with automatic reconnect after close code 1006.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| initial_cm: typing.Any, | ||
| initial_protocol: typing.Any, | ||
| client_wrapper: BaseClientWrapper, | ||
| downstream_websocket_url: typing.Optional[str], | ||
| request_options: typing.Optional[RequestOptions], | ||
| max_reconnect_attempts: int = 30, | ||
| ) -> None: | ||
| super().__init__() | ||
| self._cm = initial_cm | ||
| self._inner = ConversationsSocketClient(websocket=initial_protocol) | ||
| self._client_wrapper = client_wrapper | ||
| self._downstream_websocket_url = downstream_websocket_url | ||
| self._request_options = request_options | ||
| self._max_reconnect_attempts = max_reconnect_attempts | ||
| self._conversation_id: typing.Optional[str] = None | ||
| self._reconnect_attempts = 0 | ||
| self._user_closed = False |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if there's supposed to be a lock in the sync version
rg -n "threading.Lock" src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 49
🏁 Script executed:
# First, let's see the full file structure
wc -l src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 126
🏁 Script executed:
# Check the async version mentioned in the review
rg -n "class ReconnectableAsyncConversationsSocketClient" src/phonic/conversations/reconnectable_socket_client.py -A 30Repository: Phonic-Co/phonic-python
Length of output: 1380
🏁 Script executed:
# Check if asyncio.Lock is present in the async version
rg -n "asyncio.Lock" src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 105
🏁 Script executed:
# Check for threading imports or any synchronization
rg -n "import threading|from threading" src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 49
🏁 Script executed:
# Read the async version methods, especially close() and recv()
sed -n '48,197p' src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 5770
🏁 Script executed:
# Read the sync version methods
sed -n '198,341p' src/phonic/conversations/reconnectable_socket_client.pyRepository: Phonic-Co/phonic-python
Length of output: 5357
Add threading lock to synchronize close() and reconnect operations.
The async version protects against race conditions using asyncio.Lock() in both close() and recv() methods. The sync version has equivalent critical sections but no synchronization mechanism. This creates a potential race condition: if close() is called from one thread while recv() is reconnecting in another, state modifications to self._cm and related fields can interleave unsafely.
Add self._lock = threading.Lock() in __init__ and use it to guard the same critical sections:
- Wrap
self._close_current_connection()call inclose() - Wrap the reconnect block in
recv()(lines 282-283) to ensure theself._user_closedcheck andself._reconnect()call are atomic
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 198 -
220, Add a threading lock to the sync reconnectable client to prevent races: in
ReconnectableConversationsSocketClient.__init__ create self._lock =
threading.Lock(), then use this lock to guard the critical section in close()
around the call to self._close_current_connection(), and likewise guard the
reconnect block in recv() so the check of self._user_closed and the subsequent
call to self._reconnect() execute atomically; this mirrors the asyncio.Lock
protection used in the async version and should reference the methods __init__,
close, recv, _close_current_connection, and _reconnect.
…ners Wrap initial WebSocket connection in try/except InvalidWebSocketStatus for both sync and async reconnectable socket openers, matching the behavior of the non-reconnectable versions. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/phonic/conversations/websocket_connect.py (1)
129-145: Use safer context acquisition to prevent leaks if wrapper construction fails.Both reconnectable openers enter the websocket context before creating the reconnectable client wrapper. If wrapper construction raises, the already-open context may not be closed.
♻️ Suggested lifecycle hardening
`@contextmanager` def open_reconnectable_conversations_socket_sync( @@ - try: - cm = websockets_sync_client.connect(ws_url, additional_headers=headers) - protocol = cm.__enter__() - except InvalidWebSocketStatus as exc: - _raise_api_error_for_invalid_websocket_status(exc, headers) - client = ReconnectableConversationsSocketClient( - initial_cm=cm, - initial_protocol=protocol, - client_wrapper=client_wrapper, - downstream_websocket_url=downstream_websocket_url, - request_options=request_options, - max_reconnect_attempts=max_reconnect_attempts, - ) + cm = None + try: + cm = websockets_sync_client.connect(ws_url, additional_headers=headers) + protocol = cm.__enter__() + client = ReconnectableConversationsSocketClient( + initial_cm=cm, + initial_protocol=protocol, + client_wrapper=client_wrapper, + downstream_websocket_url=downstream_websocket_url, + request_options=request_options, + max_reconnect_attempts=max_reconnect_attempts, + ) + except InvalidWebSocketStatus as exc: + _raise_api_error_for_invalid_websocket_status(exc, headers) + except Exception: + if cm is not None: + cm.__exit__(None, None, None) + raiseAlso applies to: 163-179
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/websocket_connect.py` around lines 129 - 145, The code currently calls protocol = cm.__enter__() before constructing ReconnectableConversationsSocketClient, which can leak the open websocket if the wrapper construction raises; change to acquire the context safely by entering the context inside a try block and ensuring cm.__exit__ (or cm.close) is called if ReconnectableConversationsSocketClient construction fails — e.g., call cm = websockets_sync_client.connect(...), then try: protocol = cm.__enter__(); try: client = ReconnectableConversationsSocketClient(initial_cm=cm, initial_protocol=protocol, ...); except Exception: cm.__exit__(None, None, None); raise; finally yield/return as before — apply the same safe-enter-and-cleanup pattern to the other similar block (lines 163-179) so no opened context is leaked if wrapper construction throws.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/phonic/conversations/websocket_connect.py`:
- Around line 53-56: The code copies request headers from
client_wrapper.get_headers() into the headers variable and then includes that
headers dict in raised ApiError objects, which can leak sensitive values (e.g.,
Authorization); fix by creating a redacted copy before any error construction or
logging: make a shallow copy of headers, replace sensitive keys like
"Authorization", "Proxy-Authorization", "Cookie", and "Set-Cookie" with a
placeholder (e.g., "[REDACTED]"), and use that redacted copy when instantiating
ApiError or any telemetry/log payload while leaving the original headers intact
for the actual request; apply the same redaction behavior to the other block
referenced (lines 59-73) where headers are propagated into exceptions.
- Around line 21-24: The async connector uses websockets_client_connect with the
wrong header kwarg for newer websockets versions (it passes
extra_headers=headers), causing runtime errors when the fallback import is the
modern connect which expects additional_headers; update the code to call
websockets_client_connect with the correct parameter by wrapping the imported
connect in a compatibility shim: create a small wrapper around
websockets_client_connect (e.g., normalize_websocket_connect) that detects
whether the callable accepts extra_headers or additional_headers (or tries one
then falls back to the other) and then use that wrapper wherever
websockets_client_connect is currently called (references:
websockets_client_connect, the async connect sites around the async connect
functions using headers on lines ~108 and ~164).
---
Nitpick comments:
In `@src/phonic/conversations/websocket_connect.py`:
- Around line 129-145: The code currently calls protocol = cm.__enter__() before
constructing ReconnectableConversationsSocketClient, which can leak the open
websocket if the wrapper construction raises; change to acquire the context
safely by entering the context inside a try block and ensuring cm.__exit__ (or
cm.close) is called if ReconnectableConversationsSocketClient construction fails
— e.g., call cm = websockets_sync_client.connect(...), then try: protocol =
cm.__enter__(); try: client =
ReconnectableConversationsSocketClient(initial_cm=cm, initial_protocol=protocol,
...); except Exception: cm.__exit__(None, None, None); raise; finally
yield/return as before — apply the same safe-enter-and-cleanup pattern to the
other similar block (lines 163-179) so no opened context is leaked if wrapper
construction throws.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f748d663-1e39-4c0b-b2a4-1d82bba54aa6
📒 Files selected for processing (1)
src/phonic/conversations/websocket_connect.py
| from websockets.legacy.client import connect as websockets_client_connect # type: ignore | ||
| except ImportError: | ||
| from websockets import connect as websockets_client_connect # type: ignore | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the latest websocketsPython docs, what keyword argument doeswebsockets.connect use for custom headers (extra_headersoradditional_headers), and is websockets.legacy.client.connect still the recommended import path?
💡 Result:
In the latest websockets Python documentation (version 16.0 as of 2026), websockets.connect uses the keyword argument additional_headers for custom headers. Note that websockets.connect is an alias for the legacy implementation; the new recommended asyncio client is websockets.asyncio.client.connect, which also uses additional_headers. The legacy client websockets.legacy.client.connect uses extra_headers and is deprecated (deprecated in v14.0, to be removed by 2030). It is not the recommended import path; upgrade to the new asyncio implementation.
Citations:
- 1: https://websockets.readthedocs.io/en/latest/faq/client.html
- 2: https://websockets.readthedocs.io/en/stable/reference/legacy/client.html
- 3: https://websockets.readthedocs.io/en/stable/faq/client.html
- 4: https://websockets.readthedocs.io/en/15.0.1/reference/asyncio/client.html
- 5: https://websockets.readthedocs.io/en/14.2/faq/client.html
- 6: https://websockets.readthedocs.io/en/14.0/reference/asyncio/client.html
- 7: https://websockets.readthedocs.io/en/stable/
- 8: https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html
- 9: https://websockets.readthedocs.io/en/13.0.1/reference/sync/client.html
🏁 Script executed:
cat -n src/phonic/conversations/websocket_connect.py | head -180Repository: Phonic-Co/phonic-python
Length of output: 8088
Async WebSocket connect uses incorrect header parameter for newer websockets versions.
Lines 108 and 164 use extra_headers=headers with websockets_client_connect, which will fail at runtime if the fallback import resolves to the modern websockets.connect (which expects additional_headers=). The sync client correctly uses additional_headers= on lines 89 and 130. This inconsistency breaks async socket creation on versions where websockets.legacy.client is unavailable.
🛠️ Compatibility-safe wrapper
+import inspect
+
+def _build_async_connect_kwargs(
+ headers: typing.Dict[str, str],
+) -> typing.Dict[str, typing.Dict[str, str]]:
+ params = inspect.signature(websockets_client_connect).parameters
+ if "extra_headers" in params:
+ return {"extra_headers": headers}
+ return {"additional_headers": headers}
+
...
- async with websockets_client_connect(ws_url, extra_headers=headers) as protocol:
+ async with websockets_client_connect(
+ ws_url, **_build_async_connect_kwargs(headers)
+ ) as protocol:
yield AsyncConversationsSocketClient(websocket=protocol)
...
- cm = websockets_client_connect(ws_url, extra_headers=headers)
+ cm = websockets_client_connect(ws_url, **_build_async_connect_kwargs(headers))
protocol = await cm.__aenter__()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/websocket_connect.py` around lines 21 - 24, The
async connector uses websockets_client_connect with the wrong header kwarg for
newer websockets versions (it passes extra_headers=headers), causing runtime
errors when the fallback import is the modern connect which expects
additional_headers; update the code to call websockets_client_connect with the
correct parameter by wrapping the imported connect in a compatibility shim:
create a small wrapper around websockets_client_connect (e.g.,
normalize_websocket_connect) that detects whether the callable accepts
extra_headers or additional_headers (or tries one then falls back to the other)
and then use that wrapper wherever websockets_client_connect is currently called
(references: websockets_client_connect, the async connect sites around the async
connect functions using headers on lines ~108 and ~164).
| headers = client_wrapper.get_headers() | ||
| if request_options and "additional_headers" in request_options: | ||
| headers.update(request_options["additional_headers"]) | ||
| return ws_url, headers |
There was a problem hiding this comment.
Redact sensitive request headers before raising ApiError.
headers here are request headers (including Authorization from BaseClientWrapper.get_headers()), and they are copied into ApiError. This can leak bearer tokens via logs/telemetry when exceptions are serialized.
🔐 Proposed fix
+SENSITIVE_HEADER_KEYS = {
+ "authorization",
+ "proxy-authorization",
+ "cookie",
+ "set-cookie",
+ "x-api-key",
+}
+
+
+def _redact_headers(headers: typing.Dict[str, str]) -> typing.Dict[str, str]:
+ return {
+ key: ("<redacted>" if key.lower() in SENSITIVE_HEADER_KEYS else value)
+ for key, value in headers.items()
+ }
+
+
def _raise_api_error_for_invalid_websocket_status(
exc: InvalidWebSocketStatus, headers: typing.Dict[str, str]
) -> typing.NoReturn:
status_code: int = get_status_code(exc)
if status_code == 401:
raise ApiError(
status_code=status_code,
- headers=dict(headers),
+ headers=_redact_headers(headers),
body="Websocket initialized with invalid credentials.",
)
raise ApiError(
status_code=status_code,
- headers=dict(headers),
+ headers=_redact_headers(headers),
body="Unexpected error when initializing websocket connection.",
)Also applies to: 59-73
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/websocket_connect.py` around lines 53 - 56, The code
copies request headers from client_wrapper.get_headers() into the headers
variable and then includes that headers dict in raised ApiError objects, which
can leak sensitive values (e.g., Authorization); fix by creating a redacted copy
before any error construction or logging: make a shallow copy of headers,
replace sensitive keys like "Authorization", "Proxy-Authorization", "Cookie",
and "Set-Cookie" with a placeholder (e.g., "[REDACTED]"), and use that redacted
copy when instantiating ApiError or any telemetry/log payload while leaving the
original headers intact for the actual request; apply the same redaction
behavior to the other block referenced (lines 59-73) where headers are
propagated into exceptions.
Catch exceptions from _reconnect() (e.g. ConnectionRefusedError, OSError) so the retry loop continues instead of breaking permanently. The next recv() call on the dead socket will raise ConnectionClosed, triggering another reconnect attempt. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Server responds with 4800 (session expired) or 4801 (invalid state) when the session is gone. Client retries with backoff until it gets one of these terminal codes, instead of using a client-side max. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Server terminal codes (4800/4801) stop retries in normal operation. This cap is a circuit breaker for when the server is completely unreachable and never responds with a close code. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
| msg = await self.recv() | ||
| yield msg | ||
| except ConnectionClosed: | ||
| break |
There was a problem hiding this comment.
Iterator hides failed websocket reconnects
Medium Severity
ReconnectableAsyncConversationsSocketClient.__aiter__ and ReconnectableConversationsSocketClient.__iter__ catch ConnectionClosed and break, so abnormal closures become silent end-of-stream. When reconnect is impossible or retries are exhausted, for/async for consumers cannot detect failure and may treat a dropped connection as a normal conversation end.
Additional Locations (1)
| await self._inner.send_generate_reply(message) | ||
|
|
||
| async def send_say(self, message: typing.Any) -> None: | ||
| await self._inner.send_say(message) |
There was a problem hiding this comment.
Reconnect never triggers on send failures
Medium Severity
The reconnect wrapper only retries inside recv(). All send_* methods directly call self._inner.send_*, so a dropped socket during outbound audio/config traffic raises ConnectionClosed immediately and no reconnect is attempted. This makes automatic reconnection fail during send-heavy phases.
Additional Locations (1)
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/phonic/conversations/reconnectable_socket_client.py (1)
212-232:⚠️ Potential issue | 🟠 MajorSync version lacks threading.Lock for race condition protection.
The async version uses
asyncio.Lock()(line 75) to protectclose()and the reconnect block inrecv(). The sync version has no equivalent synchronization. Ifclose()is called from one thread whilerecv()is in the reconnect path on another thread, state corruption can occur.🔒 Add threading.Lock similar to async version
+import threading + class ReconnectableConversationsSocketClient(EventEmitterMixin): """Sync conversations socket with automatic reconnect after close code 1006.""" def __init__( self, *, initial_cm: typing.Any, initial_protocol: typing.Any, client_wrapper: BaseClientWrapper, downstream_websocket_url: typing.Optional[str], request_options: typing.Optional[RequestOptions], ) -> None: super().__init__() self._cm = initial_cm self._inner = ConversationsSocketClient(websocket=initial_protocol) self._client_wrapper = client_wrapper self._downstream_websocket_url = downstream_websocket_url self._request_options = request_options self._conversation_id: typing.Optional[str] = None self._reconnect_attempts = 0 self._user_closed = False + self._lock = threading.Lock()Then update
close()and the reconnect block inrecv()to usewith self._lock:.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 212 - 232, The sync ReconnectableConversationsSocketClient is missing a threading lock to protect concurrent access to state during close() and the reconnect path in recv(); add self._lock = threading.Lock() in __init__ and guard the critical sections in close() and the reconnect branch inside recv() with with self._lock: so mutations and checks of _user_closed, _reconnect_attempts, and swapping/assigning self._inner (and any associated reconnect logic using _downstream_websocket_url/_request_options) are atomic and race-free.
🧹 Nitpick comments (2)
src/phonic/conversations/reconnectable_socket_client.py (1)
94-97: Consider logging exceptions at debug level instead of silent pass.Multiple locations silently swallow exceptions (lines 96-97, 108-109, 159-164). While this is intentional for resilience, adding debug-level logging would aid troubleshooting without affecting normal operation.
🔧 Example for lines 94-97
elif hasattr(msg, "dict"): try: self._observe_json(msg.dict()) except Exception: - pass + _logger.debug("Failed to observe message", exc_info=True)This requires adding a logger at module level:
import logging _logger = logging.getLogger(__name__)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 94 - 97, Add a module-level logger and replace the silent excepts with debug-level logs that include exception info: add "import logging" and "_logger = logging.getLogger(__name__)" at top of reconnectable_socket_client.py, and in the try/except blocks that swallow errors (the one wrapping self._observe_json(msg.dict()) and the other excepts around socket/message handling) change "except Exception: pass" to calls like _logger.debug("Exception in <method_or_action> while processing %s", variable_or_context, exc_info=True) so each catch logs context (e.g., method name and the msg or payload) and exc_info=True for a stacktrace; keep the exception handling behavior otherwise unchanged.src/phonic/conversations/client.py (1)
519-522: Docstring return type doesn't reflect Union with ReconnectableConversationsSocketClient.The actual return type annotation (line 506) is
Union[ConversationsSocketClient, ReconnectableConversationsSocketClient], but the docstring only mentionsConversationsSocketClient.📝 Update docstring
Returns ------- - ConversationsSocketClient + Union[ConversationsSocketClient, ReconnectableConversationsSocketClient] + Returns ReconnectableConversationsSocketClient when reconnect_conversation_on_abnormal_disconnect is enabled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/client.py` around lines 519 - 522, The docstring's "Returns" section currently lists only ConversationsSocketClient but the function's return annotation is Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]; update the docstring to reflect this union (mention both ConversationsSocketClient and ReconnectableConversationsSocketClient and that either may be returned) so it matches the type annotation (e.g., in the function that returns Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 212-232: The sync ReconnectableConversationsSocketClient is
missing a threading lock to protect concurrent access to state during close()
and the reconnect path in recv(); add self._lock = threading.Lock() in __init__
and guard the critical sections in close() and the reconnect branch inside
recv() with with self._lock: so mutations and checks of _user_closed,
_reconnect_attempts, and swapping/assigning self._inner (and any associated
reconnect logic using _downstream_websocket_url/_request_options) are atomic and
race-free.
---
Nitpick comments:
In `@src/phonic/conversations/client.py`:
- Around line 519-522: The docstring's "Returns" section currently lists only
ConversationsSocketClient but the function's return annotation is
Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]; update
the docstring to reflect this union (mention both ConversationsSocketClient and
ReconnectableConversationsSocketClient and that either may be returned) so it
matches the type annotation (e.g., in the function that returns
Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]).
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 94-97: Add a module-level logger and replace the silent excepts
with debug-level logs that include exception info: add "import logging" and
"_logger = logging.getLogger(__name__)" at top of
reconnectable_socket_client.py, and in the try/except blocks that swallow errors
(the one wrapping self._observe_json(msg.dict()) and the other excepts around
socket/message handling) change "except Exception: pass" to calls like
_logger.debug("Exception in <method_or_action> while processing %s",
variable_or_context, exc_info=True) so each catch logs context (e.g., method
name and the msg or payload) and exc_info=True for a stacktrace; keep the
exception handling behavior otherwise unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9dbd2c98-fa0f-4755-ba3b-1cc0248a6116
📒 Files selected for processing (4)
src/phonic/conversations/client.pysrc/phonic/conversations/raw_client.pysrc/phonic/conversations/reconnectable_socket_client.pysrc/phonic/conversations/websocket_connect.py
🚧 Files skipped from review as they are similar to previous changes (2)
- src/phonic/conversations/raw_client.py
- src/phonic/conversations/websocket_connect.py
- Make _recv_with_reconnect recursive instead of while-True loop - Add _reconnecting flag and _is_send_safe() to silently drop sends during reconnect - Pass ConnectionClosed from reconnect failures through _should_reconnect for proper 4800/4801 handling - Simplify _close_code to use exc.rcvd.code directly (deprecated .code removed) - Make _should_reconnect a regular method (not async) Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/phonic/conversations/reconnectable_socket_client.py (1)
27-36: Consider usingTERMINAL_RECONNECT_CODESexplicitly or documenting its purpose.The constant
TERMINAL_RECONNECT_CODESis defined but never referenced in_should_reconnect. The logic works correctly (only 1006 triggers reconnect, so 4800/4801 are implicitly handled), but the dead constant may confuse maintainers.Consider either:
- Adding a comment explaining it exists for documentation/future use, or
- Using it explicitly in
_should_reconnectfor clarity:♻️ Option 2: Use the constant explicitly
def _should_reconnect(self, exc: BaseException) -> bool: if self._user_closed: return False - if _close_code(exc) != ABNORMAL_CLOSURE: + code = _close_code(exc) + if code != ABNORMAL_CLOSURE or code in TERMINAL_RECONNECT_CODES: return False if not self._conversation_id: return False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 27 - 36, TERMINAL_RECONNECT_CODES is defined but never referenced, which is confusing; update the _should_reconnect function to explicitly check the close code against TERMINAL_RECONNECT_CODES (and still treat ABNORMAL_CLOSURE specially) so terminal server codes (4800/4801) are clearly handled, or alternatively add a one-line comment next to the constant explaining it’s intentionally kept for documentation/future use; prefer the explicit check in _should_reconnect to make the logic and intent clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/phonic/conversations/raw_client.py`:
- Around line 1133-1136: The docstring for the factory/constructor that
currently lists the return as ConversationsSocketClient must be updated to
reflect the annotated return type Union[ConversationsSocketClient,
ReconnectableConversationsSocketClient]; edit the Returns section in the
docstring for the function/method around where ConversationsSocketClient is
documented to mention both possible return types (ConversationsSocketClient and
ReconnectableConversationsSocketClient), briefly describe when each is returned,
and keep wording consistent with existing docstring style.
- Around line 2234-2237: The Returns docstring currently lists only
AsyncConversationsSocketClient but the function actually returns a union
including ReconnectableAsyncConversationsSocketClient; update the Returns
section to reflect the union type (e.g., "AsyncConversationsSocketClient |
ReconnectableAsyncConversationsSocketClient") and adjust any phrasing to mention
both variants, and also ensure the function/method signature/type hint (if
present) aligns with that union; look for the docstring around the
AsyncConversationsSocketClient return block and update it accordingly.
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 193-224: The send_* methods (send_config, send_tool_call_output,
send_generate_reply, send_set_external_id, send_update_system_prompt,
send_add_system_message, send_say, and send_audio_chunk) silently drop messages
when _is_send_safe() is False; change their behavior to make drops observable by
callers: update the signatures to return bool (True if message was forwarded,
False if dropped) for all methods except send_audio_chunk (keep silent
behavior), and inside each method check _is_send_safe() and return False
immediately when unsafe, otherwise await the corresponding _inner.<method>(...)
and return True; also update the class/method docstrings to document this return
value and the reconnection semantics so callers know which sends may be dropped.
---
Nitpick comments:
In `@src/phonic/conversations/reconnectable_socket_client.py`:
- Around line 27-36: TERMINAL_RECONNECT_CODES is defined but never referenced,
which is confusing; update the _should_reconnect function to explicitly check
the close code against TERMINAL_RECONNECT_CODES (and still treat
ABNORMAL_CLOSURE specially) so terminal server codes (4800/4801) are clearly
handled, or alternatively add a one-line comment next to the constant explaining
it’s intentionally kept for documentation/future use; prefer the explicit check
in _should_reconnect to make the logic and intent clear.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: eb9b9088-bbe7-4126-9a76-b1a0da91a079
📒 Files selected for processing (4)
src/phonic/conversations/client.pysrc/phonic/conversations/raw_client.pysrc/phonic/conversations/reconnectable_socket_client.pysrc/phonic/conversations/websocket_connect.py
✅ Files skipped from review due to trivial changes (1)
- src/phonic/conversations/websocket_connect.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/phonic/conversations/client.py
| Returns | ||
| ------- | ||
| ConversationsSocketClient | ||
| """ |
There was a problem hiding this comment.
Docstring doesn't reflect union return type.
The Returns section still says ConversationsSocketClient but the actual return type annotation is Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]. Consider updating the docstring to document both possible return types.
📝 Suggested fix
Returns
-------
- ConversationsSocketClient
+ Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]
+ ConversationsSocketClient when reconnect is disabled,
+ ReconnectableConversationsSocketClient when reconnect is enabled.
"""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/raw_client.py` around lines 1133 - 1136, The
docstring for the factory/constructor that currently lists the return as
ConversationsSocketClient must be updated to reflect the annotated return type
Union[ConversationsSocketClient, ReconnectableConversationsSocketClient]; edit
the Returns section in the docstring for the function/method around where
ConversationsSocketClient is documented to mention both possible return types
(ConversationsSocketClient and ReconnectableConversationsSocketClient), briefly
describe when each is returned, and keep wording consistent with existing
docstring style.
| Returns | ||
| ------- | ||
| AsyncConversationsSocketClient | ||
| """ |
There was a problem hiding this comment.
Docstring doesn't reflect union return type.
Same issue as the sync version - the Returns section says AsyncConversationsSocketClient but the actual return type is a union including ReconnectableAsyncConversationsSocketClient.
📝 Suggested fix
Returns
-------
- AsyncConversationsSocketClient
+ Union[AsyncConversationsSocketClient, ReconnectableAsyncConversationsSocketClient]
+ AsyncConversationsSocketClient when reconnect is disabled,
+ ReconnectableAsyncConversationsSocketClient when reconnect is enabled.
"""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/raw_client.py` around lines 2234 - 2237, The Returns
docstring currently lists only AsyncConversationsSocketClient but the function
actually returns a union including ReconnectableAsyncConversationsSocketClient;
update the Returns section to reflect the union type (e.g.,
"AsyncConversationsSocketClient | ReconnectableAsyncConversationsSocketClient")
and adjust any phrasing to mention both variants, and also ensure the
function/method signature/type hint (if present) aligns with that union; look
for the docstring around the AsyncConversationsSocketClient return block and
update it accordingly.
| async def send_config(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_config(message) | ||
|
|
||
| async def send_audio_chunk(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_audio_chunk(message) | ||
|
|
||
| async def send_update_system_prompt(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_update_system_prompt(message) | ||
|
|
||
| async def send_add_system_message(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_add_system_message(message) | ||
|
|
||
| async def send_set_external_id(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_set_external_id(message) | ||
|
|
||
| async def send_tool_call_output(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_tool_call_output(message) | ||
|
|
||
| async def send_generate_reply(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_generate_reply(message) | ||
|
|
||
| async def send_say(self, message: typing.Any) -> None: | ||
| if self._is_send_safe(): | ||
| await self._inner.send_say(message) | ||
|
|
There was a problem hiding this comment.
Silent message drops during reconnect may surprise callers.
All send_* methods silently return when _is_send_safe() is False (during reconnection or after close). While this is intentional per the PR objectives, callers have no way to know their messages were dropped.
Consider either:
- Returning a boolean indicating whether the send succeeded
- Raising an exception when attempting to send during reconnect
- Documenting this behavior prominently in the class/method docstrings
For audio chunks, silent drops are likely acceptable. For critical messages like send_config or send_tool_call_output, silent drops could cause subtle application bugs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/phonic/conversations/reconnectable_socket_client.py` around lines 193 -
224, The send_* methods (send_config, send_tool_call_output,
send_generate_reply, send_set_external_id, send_update_system_prompt,
send_add_system_message, send_say, and send_audio_chunk) silently drop messages
when _is_send_safe() is False; change their behavior to make drops observable by
callers: update the signatures to return bool (True if message was forwarded,
False if dropped) for all methods except send_audio_chunk (keep silent
behavior), and inside each method check _is_send_safe() and return False
immediately when unsafe, otherwise await the corresponding _inner.<method>(...)
and return True; also update the class/method docstrings to document this return
value and the reconnection semantics so callers know which sends may be dropped.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 4 total unresolved issues (including 2 from previous reviews).
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| except ConnectionClosed as reconnect_exc: | ||
| return await self._recv_with_reconnect(reconnect_exc) | ||
| except Exception: | ||
| return await self._recv_with_reconnect(exc) |
There was a problem hiding this comment.
Async reconnect deadlocks on non-reentrant lock
High Severity
The recursive calls to _recv_with_reconnect on lines 162 and 164 happen inside the async with self._lock: block (acquired at line 154). Since asyncio.Lock is not reentrant, the recursive call will attempt to re-acquire self._lock and deadlock forever. This means any failed reconnection attempt (the very scenario reconnection logic exists for) will permanently hang the client.
Additional Locations (1)
|
|
||
| ABNORMAL_CLOSURE = 1006 | ||
| # Server close codes that mean the session is gone — stop retrying. | ||
| TERMINAL_RECONNECT_CODES = {4800, 4801} |
There was a problem hiding this comment.
Unused TERMINAL_RECONNECT_CODES constant is dead code
Low Severity
TERMINAL_RECONNECT_CODES is defined but never referenced anywhere in the codebase. The comment says these codes mean "the session is gone — stop retrying," but _should_reconnect only checks for ABNORMAL_CLOSURE (1006) and doesn't use this set. This is likely either a missed integration or leftover from the Node port.


Add reconnection logic to python sdk client
Note
Medium Risk
Changes WebSocket connection lifecycle and adds retry logic, which can affect real-time conversation stability and introduce reconnect loops or message loss if edge cases aren’t handled.
Overview
Adds an opt-in
reconnect_conversation_on_abnormal_disconnectflag onPhonic/AsyncPhonicthat enables automatic STS conversations WebSocket reconnection after abnormal closes (1006) usingreconnect_conv_id.Refactors
conversations.connect()(and raw variants) to route through new shared helpers inwebsocket_connect.py, and introducesReconnectable*ConversationsSocketClientwrappers that trackconversation_id, backoff/retry reconnect attempts, and suppress sends while reconnecting. Updates.fernignoreto prevent Fern from overwriting the new/modified client and WebSocket modules.Written by Cursor Bugbot for commit a12be73. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
New Features
Chores