@@ -152,48 +152,46 @@ def _get_current_run_for_propagation() -> RunTree | None:
152152
153153
154154# ---------------------------------------------------------------------------
155- # Workflow event loop safety: patch @traceable's aio_to_thread
155+ # Workflow event loop safety: override @traceable's aio_to_thread
156156# ---------------------------------------------------------------------------
157157
158- _aio_to_thread_patched = False
158+ _aio_to_thread_override_installed = False
159159
160160
161- def _patch_aio_to_thread () -> None :
162- """Patch langsmith's ``aio_to_thread`` to run synchronously in workflows.
161+ async def _temporal_aio_to_thread (
162+ default_aio_to_thread : Callable [..., Any ],
163+ ctx : Any ,
164+ func : Callable [..., Any ],
165+ / ,
166+ * args : Any ,
167+ ** kwargs : Any ,
168+ ) -> Any :
169+ """Run LangSmith's ``aio_to_thread`` synchronously inside Temporal workflows.
163170
164171 The ``@traceable`` decorator on async functions uses ``aio_to_thread()`` →
165172 ``loop.run_in_executor()`` for run setup/teardown. The Temporal workflow
166- event loop does not support ``run_in_executor``. This patch runs those
167- functions synchronously on the workflow thread when inside a workflow.
168- Functions passed here must not perform blocking I/O .
173+ event loop does not support ``run_in_executor``. This override runs those
174+ functions synchronously on the workflow thread when inside a workflow,
175+ and delegates to the default implementation outside workflows .
169176
177+ Registered via ``langsmith.set_runtime_overrides(aio_to_thread=...)``.
170178 """
171- global _aio_to_thread_patched # noqa: PLW0603
172- if _aio_to_thread_patched :
173- return
174-
175- import langsmith ._internal ._aiter as _aiter
179+ if not temporalio .workflow .in_workflow ():
180+ return await default_aio_to_thread (ctx , func , * args , ** kwargs )
181+ with temporalio .workflow .unsafe .sandbox_unrestricted ():
182+ return ctx .run (func , * args , ** kwargs )
176183
177- _original = _aiter .aio_to_thread
178184
179- import contextvars
185+ def _install_aio_to_thread_override () -> None :
186+ """Install the ``aio_to_thread`` override via LangSmith's official API.
180187
181- async def _safe_aio_to_thread (
182- func : Callable [..., Any ],
183- / ,
184- * args : Any ,
185- __ctx : contextvars .Context | None = None ,
186- ** kwargs : Any ,
187- ) -> Any :
188- if not temporalio .workflow .in_workflow ():
189- return await _original (func , * args , __ctx = __ctx , ** kwargs )
190- with temporalio .workflow .unsafe .sandbox_unrestricted ():
191- # Run without ctx.run() so context var changes propagate
192- # to the caller. Safe because workflows are single-threaded.
193- return func (* args , ** kwargs )
194-
195- _aiter .aio_to_thread = _safe_aio_to_thread # type: ignore[assignment]
196- _aio_to_thread_patched = True
188+ Safe to call multiple times; the override is only installed once.
189+ """
190+ global _aio_to_thread_override_installed # noqa: PLW0603
191+ if _aio_to_thread_override_installed :
192+ return
193+ langsmith .set_runtime_overrides (aio_to_thread = _temporal_aio_to_thread )
194+ _aio_to_thread_override_installed = True
197195
198196
199197# ---------------------------------------------------------------------------
@@ -599,7 +597,7 @@ def workflow_interceptor_class(
599597 self , input : temporalio .worker .WorkflowInterceptorClassInput
600598 ) -> type [_LangSmithWorkflowInboundInterceptor ]:
601599 """Return the workflow interceptor class with config bound."""
602- _patch_aio_to_thread ()
600+ _install_aio_to_thread_override ()
603601 config = self
604602
605603 class InterceptorWithConfig (_LangSmithWorkflowInboundInterceptor ):
0 commit comments