@@ -154,48 +154,46 @@ def _get_current_run_for_propagation() -> RunTree | None:
154154
155155
156156# ---------------------------------------------------------------------------
157- # Workflow event loop safety: patch @traceable's aio_to_thread
157+ # Workflow event loop safety: override @traceable's aio_to_thread
158158# ---------------------------------------------------------------------------
159159
160- _aio_to_thread_patched = False
160+ _aio_to_thread_override_installed = False
161161
162162
163- def _patch_aio_to_thread () -> None :
164- """Patch langsmith's ``aio_to_thread`` to run synchronously in workflows.
163+ async def _temporal_aio_to_thread (
164+ default_aio_to_thread : Callable [..., Any ],
165+ ctx : Any ,
166+ func : Callable [..., Any ],
167+ / ,
168+ * args : Any ,
169+ ** kwargs : Any ,
170+ ) -> Any :
171+ """Run LangSmith's ``aio_to_thread`` synchronously inside Temporal workflows.
165172
166173 The ``@traceable`` decorator on async functions uses ``aio_to_thread()`` →
167174 ``loop.run_in_executor()`` for run setup/teardown. The Temporal workflow
168- event loop does not support ``run_in_executor``. This patch runs those
169- functions synchronously on the workflow thread when inside a workflow.
170- Functions passed here must not perform blocking I/O .
175+ event loop does not support ``run_in_executor``. This override runs those
176+ functions synchronously on the workflow thread when inside a workflow,
177+ and delegates to the default implementation outside workflows .
171178
179+ Registered via ``langsmith.set_runtime_overrides(aio_to_thread=...)``.
172180 """
173- global _aio_to_thread_patched # noqa: PLW0603
174- if _aio_to_thread_patched :
175- return
176-
177- import langsmith ._internal ._aiter as _aiter
181+ if not temporalio .workflow .in_workflow ():
182+ return await default_aio_to_thread (ctx , func , * args , ** kwargs )
183+ with temporalio .workflow .unsafe .sandbox_unrestricted ():
184+ return ctx .run (func , * args , ** kwargs )
178185
179- _original = _aiter .aio_to_thread
180186
181- import contextvars
187+ def _install_aio_to_thread_override () -> None :
188+ """Install the ``aio_to_thread`` override via LangSmith's official API.
182189
183- async def _safe_aio_to_thread (
184- func : Callable [..., Any ],
185- / ,
186- * args : Any ,
187- __ctx : contextvars .Context | None = None ,
188- ** kwargs : Any ,
189- ) -> Any :
190- if not temporalio .workflow .in_workflow ():
191- return await _original (func , * args , __ctx = __ctx , ** kwargs )
192- with temporalio .workflow .unsafe .sandbox_unrestricted ():
193- # Run without ctx.run() so context var changes propagate
194- # to the caller. Safe because workflows are single-threaded.
195- return func (* args , ** kwargs )
196-
197- _aiter .aio_to_thread = _safe_aio_to_thread # type: ignore[assignment]
198- _aio_to_thread_patched = True
190+ Safe to call multiple times; the override is only installed once.
191+ """
192+ global _aio_to_thread_override_installed # noqa: PLW0603
193+ if _aio_to_thread_override_installed :
194+ return
195+ langsmith .set_runtime_overrides (aio_to_thread = _temporal_aio_to_thread )
196+ _aio_to_thread_override_installed = True
199197
200198
201199# ---------------------------------------------------------------------------
@@ -611,7 +609,7 @@ def workflow_interceptor_class(
611609 self , input : temporalio .worker .WorkflowInterceptorClassInput
612610 ) -> type [_LangSmithWorkflowInboundInterceptor ]:
613611 """Return the workflow interceptor class with config bound."""
614- _patch_aio_to_thread ()
612+ _install_aio_to_thread_override ()
615613 config = self
616614
617615 class InterceptorWithConfig (_LangSmithWorkflowInboundInterceptor ):
0 commit comments