Status: Active Last Updated: 2026-05-15
Complete reference for the notebooklm Python library.
import asyncio
from notebooklm import NotebookLMClient
async def main():
# Create client from saved authentication
async with await NotebookLMClient.from_storage() as client:
# List notebooks
notebooks = await client.notebooks.list()
print(f"Found {len(notebooks)} notebooks")
# Create a new notebook
nb = await client.notebooks.create("My Research")
print(f"Created: {nb.id}")
# Add sources
await client.sources.add_url(nb.id, "https://example.com/article")
# Ask a question
result = await client.chat.ask(nb.id, "Summarize the main points")
print(result.answer)
# Generate a podcast
status = await client.artifacts.generate_audio(nb.id)
await client.artifacts.wait_for_completion(nb.id, status.task_id)
output_path = await client.artifacts.download_audio(nb.id, "podcast.mp3")
print(f"Audio saved to: {output_path}")
asyncio.run(main())NotebookLMClient is async re-entrant on a single event loop. You can freely await multiple operations concurrently via asyncio.gather or asyncio.TaskGroup:
notebooks, sources = await asyncio.gather(
client.notebooks.list(),
client.sources.list(notebook_id),
)The client is not thread-safe. Do not share a NotebookLMClient across threads or across multiple event loops. Create one client per loop. A loop-affinity guard raises a clear RuntimeError on the authed POST hot path if you do — see the Concurrency contract section below for the full guarantees, non-guarantees, and production patterns.
If we ever provide thread-safety, it will be a versioned, opt-in API change. Do not assume it.
The client must be used as an async context manager to properly manage HTTP connections:
# Correct - uses context manager
async with await NotebookLMClient.from_storage() as client:
...
# Also correct - manual management
client = await NotebookLMClient.from_storage()
await client.__aenter__()
try:
...
finally:
await client.__aexit__(None, None, None)The client requires valid Google session cookies obtained via browser login:
# From storage file (recommended)
client = await NotebookLMClient.from_storage()
client = await NotebookLMClient.from_storage("/path/to/storage_state.json")
# From a named profile
client = await NotebookLMClient.from_storage(profile="work")
# From AuthTokens directly
from notebooklm import AuthTokens
auth = AuthTokens(
cookies={"SID": "...", "HSID": "...", ...},
csrf_token="...",
session_id="..."
)
client = NotebookLMClient(auth)
# AuthTokens also supports profiles (from_storage is async)
auth = await AuthTokens.from_storage(profile="work")Building a storage state from existing browser cookies ([cookies] extra):
Install with the optional cookies extra to pull cookies from a locally installed browser via rookiepy — useful for headless environments where you cannot run Playwright (full extras matrix: docs/installation.md#optional-extras-matrix):
pip install "notebooklm-py[cookies]"import json
import os
import rookiepy
from notebooklm import NotebookLMClient
from notebooklm.auth import (
REQUIRED_COOKIE_DOMAINS,
convert_rookiepy_cookies_to_storage_state,
)
# Pull Google cookies from Chrome (or .firefox(), .edge(), .safari(), .load() for auto-detect).
# REQUIRED_COOKIE_DOMAINS mirrors the CLI's extraction set so rotation, media
# downloads, and Drive flows all have the cookies they need.
raw = rookiepy.chrome(domains=list(REQUIRED_COOKIE_DOMAINS))
storage_state = convert_rookiepy_cookies_to_storage_state(raw)
# Persist for future runs; restrict to owner-only on POSIX since this file holds auth cookies
storage_path = "/path/to/storage_state.json"
with open(storage_path, "w") as f:
json.dump(storage_state, f)
if os.name != "nt":
os.chmod(storage_path, 0o600)
async with await NotebookLMClient.from_storage(storage_path) as client:
notebooks = await client.notebooks.list()convert_rookiepy_cookies_to_storage_state(rookiepy_cookies) converts the
cookie list returned by rookiepy into the storage-state format
NotebookLMClient.from_storage() expects:
- Key remap:
http_only→httpOnly,expires=None→expires=-1(Playwright's session-cookie convention),sameSite="None". - Filtering: cookies missing
name/value/domain, or from domains outside the auth allowlist (regional Google ccTLDs +REQUIRED_COOKIE_DOMAINS∪OPTIONAL_COOKIE_DOMAINS), are silently skipped. - Return:
{"cookies": [...], "origins": []}— drop straight intostorage_state.json.
Cookie extraction (and Google-account selection) happens in the
rookiepy.<browser>(...) call: the storage state reflects whichever Google
account is currently active in the source browser. To pick up cookies for
optional surfaces (YouTube, Docs, MyAccount, Mail), extend the rookiepy
domains= argument with OPTIONAL_COOKIE_DOMAINS (or a label-specific
subset via OPTIONAL_COOKIE_DOMAINS_BY_LABEL) — both imported from
notebooklm.auth alongside REQUIRED_COOKIE_DOMAINS. The CLI equivalent
is notebooklm login --browser-cookies <browser> [--include-domains youtube,docs,...].
Environment Variable Support:
The library respects these environment variables for authentication:
| Variable | Description |
|---|---|
NOTEBOOKLM_HOME |
Base directory for config files (default: ~/.notebooklm) |
NOTEBOOKLM_PROFILE |
Active profile name (default: default) |
NOTEBOOKLM_AUTH_JSON |
Inline auth JSON - no file needed (for CI/CD) |
Precedence (highest to lowest):
- Explicit
pathargument tofrom_storage() NOTEBOOKLM_AUTH_JSONenvironment variable- Explicit
profileargument tofrom_storage(profile="work") NOTEBOOKLM_PROFILEenvironment variable (resolves to~/.notebooklm/profiles/<name>/storage_state.json)- Active profile from
~/.notebooklm/active_profile ~/.notebooklm/profiles/default/storage_state.json~/.notebooklm/storage_state.json(legacy fallback)
CI/CD Example:
import os
# Set auth JSON from environment (e.g., GitHub Actions secret)
os.environ["NOTEBOOKLM_AUTH_JSON"] = '{"cookies": [...]}'
# Client automatically uses the env var
async with await NotebookLMClient.from_storage() as client:
notebooks = await client.notebooks.list()The library raises RPCError for API failures:
from notebooklm import RPCError
try:
result = await client.notebooks.create("Test")
except RPCError as e:
print(f"RPC failed: {e}")
# Common causes:
# - Session expired (re-run `notebooklm login`)
# - Rate limited (wait and retry)
# - Invalid parametersAutomatic Refresh: The client automatically refreshes CSRF tokens when authentication errors are detected. This happens transparently during any API call - you don't need to handle it manually.
When an RPC call fails with an auth error (HTTP 401/403 or auth-related message):
- The client fetches fresh tokens from the NotebookLM homepage
- Waits briefly to avoid rate limiting
- Retries the failed request automatically
Manual Refresh: For proactive refresh (e.g., before a long-running operation):
async with await NotebookLMClient.from_storage() as client:
# Manually refresh CSRF token and session ID
await client.refresh_auth()Note: If your session cookies have fully expired (not just CSRF tokens), you'll need to re-run notebooklm login.
Probe-then-retry for create operations. When a network or server error (5xx / 429 / connection drop) interrupts a create call, the client surfaces the failure immediately rather than blindly retrying. For the methods listed below, the client then probes the server to discover whether the resource was already created before attempting a retry. This prevents duplicate resources when the server accepted the request but the response was lost in transit. The probe runs automatically — no opt-in keyword is required.
The following methods are idempotent under retry:
| Method | Probe |
|---|---|
client.notebooks.create(title) |
Snapshot notebook IDs before, list after a transport failure, return the single new notebook with the matching title (or raise on ambiguity). |
client.sources.add_url(notebook_id, url) |
List the notebook's sources, return the existing source whose url exactly matches. |
client.sources.add_url(notebook_id, youtube_url) |
Same probe via canonical YouTube URL. |
client.sources.add_text(notebook_id, title, content) is not retry-safe: text sources lack a reliable server-side dedupe key (titles aren't unique; content isn't exposed in the source list). The default behavior is unchanged from previous releases. If you want explicit failure rather than possible silent duplication on retry, opt in:
from notebooklm import NonIdempotentRetryError
try:
await client.sources.add_text(nb_id, "Title", "Content", idempotent=True)
except NonIdempotentRetryError:
# Embed a UUID in the title and dedupe client-side instead.
...client.sources.add_file(...) and client.sources.add_drive(...) are not yet covered by the probe-then-retry wrapper — a transport failure during these calls may produce a duplicate source on retry. Tracked as a separate fix.
This section is the canonical answer to "is NotebookLMClient safe to use from
multiple coroutines / threads / processes / event loops?" The concurrency model
documented here has been hardened to support high-concurrency programmatic
clients (long-running agents, parallel asyncio.gather over many notebooks,
multi-process fleets).
If you only read one subsection, read Non-guarantees — the guard rails are narrow.
Per-loop async safety. A NotebookLMClient instance is bound to the
event loop on which it was opened. A loop-affinity guard checks
the active loop on the authed POST hot path — rpc_call() →
query_post() → _perform_authed_post() — and raises a clear RuntimeError
when the instance is re-used from a different loop. Scope limitation: the
guard fires on the hot path only. Two cold paths reach asyncio primitives
before the guard runs:
ChatAPI.askcallsnext_reqid()before itsquery_post() → _perform_authed_postchain. The first cross-loop call's_reqid_lockwill raise a deep asyncioRuntimeError(not the friendly loop-affinity message).close()awaitssave_cookies+acloseand never routes through_perform_authed_post; cross-loop close gets a deep asyncio error.
In both cases you still get a RuntimeError — just an opaque one. Best
practice: one client per loop, full stop.
Refresh deduplication. Concurrent RPCs that all
trigger a token refresh share a single underlying refresh attempt via
_refresh_lock + asyncio.shield. Waiter cancellation does not kill the
shared refresh task; the next caller in line picks up the finished tokens.
Request-ID monotonicity. next_reqid() returns a monotonic
sequence across concurrent coroutines on the same client. Guarded by
_reqid_lock.
Per-attempt and across-attempt auth snapshot atomicity.
_auth_snapshot_lock serializes _AuthSnapshot reads against the
refresh-side mutation block — without this, a token refresh that
completed between the URL-build step and the POST step could produce a
URL stitched together from a mix of pre- and post-refresh credentials
(stale session_id, fresh authuser, etc.), which Google rejects with
an opaque auth error. _build_url consumes the snapshot rather than
reading live session_id / authuser / account_email fields, so the
URL and the headers come from a single consistent auth tuple. (This
obsoletes the warning in the older "Concurrency model" subsection
above.)
Idempotent create RPCs. The following calls are
idempotent under retry via probe-then-create (when idempotent=True,
which is the default):
client.notebooks.create(title)client.sources.add_url(notebook_id, url)(YouTube URLs are auto-detected and routed through the YouTube source pathway internally)
client.sources.add_text(notebook_id, title, content) is declared
non-idempotent: text sources lack a reliable server-side dedupe key
(Google permits duplicate titles, and content is not exposed in source
listings). With idempotent=True it raises NonIdempotentRetryError. If
you set disable_internal_retries=True on the client, the probe-then-retry
wrapper is skipped entirely and the caller is responsible for retry
semantics.
Cancellation safety. Several paths are now shielded against cancellation:
close()is shielded; Ctrl-C during shutdown will not leak the underlyinghttpx.AsyncClient.refresh_auth()runs the shared refresh task underasyncio.shield; cancelling a waiter does not kill the shared refresh.- Upload finalize is shielded; on cancel signal we issue a best-effort Scotty (Google's internal resumable upload service) cancel to release the server-side upload slot.
notes.createshields theUPDATE_NOTEfinalize step and cleans up the partial note on cancel.wait_for_sourcescancels sibling pollers on the first poller's failure rather than letting them race to emit error messages.wait_for_completionuses a leader/follower polling-dedupe registry with a shielded leader task — follower cancellation does not kill the leader's poll.
Idempotent file uploads. SourcesAPI.add_file closes its file handle
under a TOCTOU-safe path and gates concurrent uploads via the
max_concurrent_uploads semaphore so a large fan-out can't exhaust the
per-process file descriptor limit.
NOT thread-safe. A NotebookLMClient instance must not be shared
across OS threads. The internal locks (_refresh_lock, _reqid_lock,
_auth_snapshot_lock) are asyncio.Lock instances and do not protect
against concurrent OS-thread access. If you need a client per thread,
construct one per thread.
NOT reusable across event loops. Per the loop-affinity guard above,
the hot path raises RuntimeError when an instance is re-used on a
different loop. Cold paths (next_reqid(), close()) raise an opaque
asyncio RuntimeError instead — same outcome, less helpful message.
ChatAPI._cache is per-instance. Chat-conversation IDs cached
inside a NotebookLMClient (on the client.chat sub-client) are not
shared across clients in the same process and never persisted across
processes. Two clients pointed at the same notebook will not share
follow-up context.
Cookies in storage are eventually-consistent across processes. When
multiple processes share a storage path, an OS-level file lock plus a
snapshot/delta merge (see docs/auth-keepalive.md §3.4) keep concurrent
writers from corrupting the file. They may, however, observe brief
staleness — a write committed by process A may not be visible to a
sibling read in process B until the next refresh cycle. Within a single
process, in-process dedupe ensures only one keepalive task runs per
canonicalized storage path.
One client per app, dependency-injected. A NotebookLMClient is
designed to be a long-lived process resource. In FastAPI, attach it to
the app lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, Request
from notebooklm import NotebookLMClient
@asynccontextmanager
async def lifespan(app: FastAPI):
async with await NotebookLMClient.from_storage() as client:
app.state.notebooklm = client
yield
# client.close() happens via __aexit__
def get_client(request: Request) -> NotebookLMClient:
return request.app.state.notebooklm
app = FastAPI(lifespan=lifespan)
@app.get("/notebooks")
async def list_notebooks(client: NotebookLMClient = Depends(get_client)):
return await client.notebooks.list()Constraint: FastAPI runs on a single event loop per worker, so one client
per worker is correct. If you run multiple Uvicorn workers, each worker
owns its own client. Do not stash a NotebookLMClient on a
process-global outside the lifespan — multi-worker servers fork the
process and you will end up with the same client object referencing
different event loops.
ConnectionLimits tuning. The HTTP pool defaults
(max_connections=100, max_keepalive_connections=50,
keepalive_expiry=30.0) are sized for typical batchexecute fan-out: a
few dozen concurrent RPCs against a single host with keep-alives held
for an interactive session. Tune via notebooklm.types.ConnectionLimits:
from notebooklm import NotebookLMClient
from notebooklm.types import ConnectionLimits
limits = ConnectionLimits(
max_connections=200, # widen the pool for a heavy worker
max_keepalive_connections=100,
keepalive_expiry=60.0,
)
client = NotebookLMClient(auth, limits=limits, max_concurrent_rpcs=64)For single-request CLI workloads the defaults are wasteful but harmless.
max_concurrent_rpcs knob. A semaphore at
_perform_authed_post caps simultaneous in-flight RPC POSTs. Default
16 — well below the default pool size so short-lived helper requests
(refresh GETs, upload preflights) still have pool headroom. Pass None
to opt out entirely (e.g. when an external rate-limiter handles
back-pressure). The backoff for 429 / 5xx retries is held inside the
semaphore for a circuit-breaker effect: a slow request keeps its slot
while it waits, so the gate naturally throttles fan-out when the server
is unhappy.
Worst-case slot hold time:
| Path | Bound | Default |
|---|---|---|
| 429 retry loop | rate_limit_max_retries × MAX_RETRY_AFTER_SECONDS |
3 × 300 = 900s |
| 5xx / network retry loop | server_error_max_retries × 30s (capped backoff) |
3 × 30 = 90s |
If your workload's tail latency is sensitive, lower
rate_limit_max_retries or tighten the semaphore — slot hold time on
the 429 path is the load-bearing variable.
Constraint (enforced at construction): max_concurrent_rpcs ≤ ConnectionLimits.max_connections. A higher RPC ceiling than the pool
capacity would let the semaphore admit requests the pool can't fulfill,
producing opaque httpx.PoolTimeout errors instead of clean
back-pressure. The NotebookLMClient.__init__ / from_storage()
constructor raises ValueError if this constraint is violated. The
semaphore floor (max_concurrent_rpcs ≥ 1 when not None) is enforced
inside Session.
max_concurrent_uploads knob. Default 4. Gates
file-upload streaming independently from the RPC throttle because
uploads use their own httpx.AsyncClient (Scotty endpoint) and do not
share the RPC connection pool. The motivation is FD exhaustion: each
in-flight upload holds one open file descriptor for the duration of the
upload, so an unbounded fan-out blows the per-process FD limit. None
resolves to the default (4); truly unbounded uploads are intentionally
not supported. Must be ≥ 1 when set explicitly.
Rate-limit retry defaults. rate_limit_max_retries=3,
server_error_max_retries=3. The 429 path honors the Retry-After
header when parseable (clamped at MAX_RETRY_AFTER_SECONDS = 300s);
when the header is absent or unparseable, the loop falls back to
exponential backoff min(2^attempt, 30) seconds with ±20% jitter
(where attempt starts at 0, so the first retry sleeps ~1 s ± 20%
before doubling), matching the 5xx path. Set either to 0 to restore
the pre-retry-loop behavior of raising RateLimitError / ServerError
immediately.
Observability hooks. The client exposes stdlib-only observability so applications can choose their own metrics backend:
from notebooklm import NotebookLMClient, correlation_id
events = []
async with await NotebookLMClient.from_storage(on_rpc_event=events.append) as client:
with correlation_id("batch-import-42"):
await client.notebooks.list()
snapshot = client.metrics_snapshot()
print(snapshot.rpc_calls_succeeded, snapshot.rpc_queue_wait_seconds_max)on_rpc_event receives a RpcTelemetryEvent for each logical RPC
completion. metrics_snapshot() returns cumulative counters for RPC
success/failure, retry counts, semaphore queue waits, upload queue waits,
and internal lock wait time. The package does not depend on Prometheus or
OpenTelemetry; forward these values to whichever backend your service uses.
Graceful shutdown. Long-lived services can stop admitting new client operations and wait for in-flight operations before closing:
await client.close(drain=True, drain_timeout=30.0)client.drain(timeout=...) is also available when your framework owns
transport shutdown separately. Once drain starts, new operations raise
RuntimeError; if the timeout expires, the client remains in draining mode.
close(drain=True, ...) still closes the transport after a drain timeout and
then re-raises the timeout.
Upload-timeout configuration. client.sources.add_file(...)
and the related upload entry points accept an upload_timeout argument
that is decoupled from the global timeout. A long-running upload of
a large file should not have to widen the global HTTP timeout to
succeed; pass upload_timeout=600.0 (or larger) to the relevant call
sites instead.
For a service that handles multiple NotebookLM tenants (different
AuthTokens, typically one per user), spin up one
NotebookLMClient per tenant. There is no cross-tenant
ChatAPI._cache bleed (the cache is per-instance), and the
loop-affinity guard plus the per-instance refresh state means tenants
cannot accidentally observe each other's auth.
Cookie storage paths must be canonicalized so two clients pointing at the same logical storage file don't run racing keepalive loops; the keepalive code path handles this automatically.
These validations run in NotebookLMClient.__init__ /
NotebookLMClient.from_storage() (and Session.__init__ for the
floor checks). All raise ValueError:
max_concurrent_rpcs ≤ ConnectionLimits.max_connectionswhen both are set (skipped when either isNone).max_concurrent_rpcs ≥ 1when notNone.max_concurrent_uploads ≥ 1when notNone.rate_limit_max_retries ≥ 0.server_error_max_retries ≥ 0.keepalivemust beNoneor a positive finite number; values belowkeepalive_min_interval(default60s) are clamped up to that floor.
Session (in src/notebooklm/_session.py) is the orchestrator that owns
the httpx.AsyncClient, glues the authed transport to RPC dispatch, and
holds the AuthTokens for the running session. The supporting state
(metrics, drain bookkeeping, request-id counter, transport plumbing,
conversation cache, etc.) is split across single-responsibility session
and kernel collaborator modules such as notebooklm._authed_transport,
notebooklm._rpc_executor, and notebooklm._transport_drain. The split
is internal — first-party legacy private callers can still import the
documented constants from notebooklm._core — but it matters when
reading the source, when writing unit tests against a stub host, or when
tracing where a particular ivar lives.
| Module | Owns | Notes |
|---|---|---|
_session |
Concrete Session orchestrator; HTTP client lifecycle; module-level constants (MAX_RETRY_AFTER_SECONDS, DEFAULT_TIMEOUT, etc.); is_auth_error, save_cookies_to_storage; error-injection seam _get_error_injection_mode. |
Concrete implementation. |
_core |
Compatibility shim that re-exports legacy private import names. | Legacy private imports continue to resolve while first-party code uses _session. |
_session_auth |
AuthRefreshCoordinator: refresh-task lifecycle, refresh lock, _AuthSnapshot rotation. |
Lazy asyncio.Lock construction; never instantiated outside a running loop. |
_conversation_cache |
Per-instance LRU _conversation_cache for ChatAPI continuity. |
Pure in-process state; not shared across Session instances. |
_cookie_persistence |
Cookie-jar → storage-state serialization, __Secure-1PSIDTS rotation. |
Exposes a SaveCookiesToStorage Protocol host. |
_transport_drain |
TransportDrainTracker: in-flight transport counters, _TransportOperationToken, lazy asyncio.Condition powering client.drain(...). |
Construction is event-loop-agnostic; the Condition is allocated on first use. |
_session_lifecycle |
ClientLifecycle: loop-affinity guard, aclose plumbing, keepalive task wiring. |
Session lifecycle collaborator. |
_client_metrics |
ClientMetrics: ClientMetricsSnapshot counters, _metrics_lock, on_rpc_event callback, queue-wait recorders. |
__init__ is event-loop-agnostic; emit_rpc_event is async and intentionally awaits the user callback (back-pressure). |
_polling_registry |
Pending-poll registry shared by long-running artifact generations. | Used by artifacts and the legacy Session._pending_polls compatibility bridge. |
_reqid_counter |
ReqidCounter: monotonic _reqid for the chat backend, lazy asyncio.Lock for concurrent ChatAPI.ask callers. |
Baseline _value=100000, default step=100000 — both are chat-API contract values; do not change. |
_rpc_executor |
RPC dispatch executor; exposes DecodeResponse and RpcOwner Protocols so callers can be unit-tested against a stub. |
Session.rpc_call delegates here. |
_authed_transport |
Authed HTTP POST path, retry loops (429 + 5xx), _AuthedTransportHost Protocol. |
Owns _TransportAuthExpired / _TransportRateLimited / _TransportServerError transport-level exceptions. |
Feature APIs depend on the shared notebooklm._session_contracts.Session
Protocol rather than concrete session internals.
If you previously imported from notebooklm._core_* modules, see
docs/migration-tier-12-to-13.md for the
Tier 12 → Tier 13 rename table. The legacy notebooklm._core import path
still resolves via a compatibility shim.
Main client class providing access to all APIs.
class NotebookLMClient:
notebooks: NotebooksAPI # Notebook operations
sources: SourcesAPI # Source management
artifacts: ArtifactsAPI # Artifact operations (audio, video, reports, etc.)
chat: ChatAPI # Conversations
research: ResearchAPI # Web/Drive research
notes: NotesAPI # User notes
settings: SettingsAPI # User settings (language, etc.)
sharing: SharingAPI # Notebook sharing
auth: AuthTokens # Current authentication tokens
is_connected: bool # Connection state
@classmethod
async def from_storage(
cls, path: str | None = None, timeout: float = 30.0,
profile: str | None = None,
keepalive: float | None = None,
keepalive_min_interval: float = 60.0,
rate_limit_max_retries: int = 3,
server_error_max_retries: int = 3,
limits: ConnectionLimits | None = None,
) -> "NotebookLMClient"
def __init__(
self, auth: AuthTokens, timeout: float = 30.0,
storage_path: Path | None = None,
keepalive: float | None = None,
keepalive_min_interval: float = 60.0,
rate_limit_max_retries: int = 3,
server_error_max_retries: int = 3,
limits: ConnectionLimits | None = None,
)
async def refresh_auth(self) -> AuthTokens
async def rpc_call(
self,
method: RPCMethod,
params: list[Any],
source_path: str = "/",
allow_null: bool = False,
_is_retry: bool = False,
*,
disable_internal_retries: bool = False,
) -> AnyRPCMethod is imported from notebooklm.rpc for raw-RPC calls; Any is
typing.Any. _is_retry is present only to preserve parity with the core
delegator and should normally be left as False.
Long-lived clients: pass keepalive=<seconds> to spawn a background task
that periodically pokes accounts.google.com and persists any rotated
__Secure-1PSIDTS cookie to storage_state.json. This keeps a worker /
agent / long-running async with block from silently staling out. Disabled
by default (keepalive=None). Values below keepalive_min_interval (default
60.0) are clamped up to that floor. See Cookie freshness for long-running
/ unattended use
for the full layered story.
Retry behavior: the client retries transient failures transparently.
server_error_max_retries(default3) retries HTTP 5xx and network-layerhttpx.RequestError(timeouts, connect errors) with exponential backoff capped at 30 seconds (min(2 ** attempt, 30), plus ±20% jitter to desynchronize concurrent retries). Set to0to disable.rate_limit_max_retries(default3) retries HTTP 429 responses. Each retry sleeps for the server'sRetry-Aftervalue when parseable; otherwise the loop falls back to the same capped-exponential-backoff schedule used for 5xx (min(2 ** attempt, 30)seconds with ±20% jitter) so the positive default is still useful when Google omits the hint. Set to0to raiseRateLimitErrorimmediately (e.g. when the calling code implements its own bespoke back-off policy). Mutating create RPCs (notebooks.create,sources.add_url) opt out of this loop viadisable_internal_retriesso the API-layeridempotent_createprobe-then-retry wrapper can own recovery for mutating calls.limitsaccepts aConnectionLimitsdataclass to tune the underlyinghttpxconnection pool. The default (ConnectionLimits()) setsmax_connections=100,max_keepalive_connections=50,keepalive_expiry=30.0— sized for typical batchexecute fan-out. Widen for heavy concurrent workloads such as FastAPI/Django services that share one client across many requests.
from notebooklm import ConnectionLimits, NotebookLMClient
# Default ``rate_limit_max_retries=3`` is on; widen the pool for a heavy worker
async with await NotebookLMClient.from_storage(
limits=ConnectionLimits(max_connections=200, max_keepalive_connections=100),
) as client:
...
# Opt out of automatic 429 retries (e.g. for a bespoke back-off layer)
async with await NotebookLMClient.from_storage(rate_limit_max_retries=0) as client:
...CLI equivalent: Notebook Commands — notebooklm list, create, delete, rename, summary.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list() |
- | list[Notebook] |
List all notebooks |
create(title) |
title: str |
Notebook |
Create a notebook |
get(notebook_id) |
notebook_id: str |
Notebook |
Get notebook details |
delete(notebook_id) |
notebook_id: str |
bool |
Delete a notebook |
rename(notebook_id, new_title) |
notebook_id: str, new_title: str |
Notebook |
Rename a notebook |
get_description(notebook_id) |
notebook_id: str |
NotebookDescription |
Get AI summary and topics |
get_metadata(notebook_id) |
notebook_id: str |
NotebookMetadata |
Get notebook metadata and sources |
get_summary(notebook_id) |
notebook_id: str |
str |
Get raw summary text |
share(notebook_id, public=True, artifact_id=None) |
notebook_id: str, bool, str | None |
dict |
Deprecated; use client.sharing.set_public() for notebook-level public sharing |
get_share_url(notebook_id, artifact_id=None) |
notebook_id: str, str | None |
str |
Get a share URL |
remove_from_recent(notebook_id) |
notebook_id: str |
None |
Remove from recently viewed |
get_raw(notebook_id) |
notebook_id: str |
Any |
Get raw API response data |
Example:
# List all notebooks
notebooks = await client.notebooks.list()
for nb in notebooks:
print(f"{nb.id}: {nb.title} ({nb.sources_count} sources)")
# Create and rename
nb = await client.notebooks.create("Draft")
nb = await client.notebooks.rename(nb.id, "Final Version")
# Get AI-generated description (parsed with suggested topics)
desc = await client.notebooks.get_description(nb.id)
print(desc.summary)
for topic in desc.suggested_topics:
print(f" - {topic.question}")
# Get raw summary text (unparsed)
summary = await client.notebooks.get_summary(nb.id)
print(summary)
# Get metadata for automation or exports
metadata = await client.notebooks.get_metadata(nb.id)
print(metadata.title)
# Enable public sharing and fetch the URL
await client.sharing.set_public(nb.id, public=True)
url = await client.notebooks.get_share_url(nb.id)
print(url)get_summary vs get_description:
get_summary()returns the raw summary text stringget_description()returns aNotebookDescriptionobject with the parsed summary and a list ofSuggestedTopicobjects for suggested questions
CLI equivalent: Source Commands — notebooklm source add, list, get, fulltext, guide, rename, refresh, delete, wait.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id) |
notebook_id: str |
list[Source] |
List sources |
get(notebook_id, source_id) |
str, str |
Source | None |
Get source details (returns None if not found) |
get_fulltext(notebook_id, source_id, *, output_format="text") |
str, str, *, output_format: Literal["text", "markdown"] |
SourceFulltext |
Get full content; "markdown" requires the optional markdownify extra |
get_guide(notebook_id, source_id) |
str, str |
dict |
Get AI-generated summary and keywords |
add_url(notebook_id, url, wait=False, wait_timeout=120.0) |
str, str, bool, float |
Source |
Add URL source (autodetects YouTube URLs and routes them appropriately) |
add_text(notebook_id, title, content, wait=False, wait_timeout=120.0) |
str, str, str, bool, float |
Source |
Add text content |
add_file(notebook_id, file_path, mime_type=None, wait=False, wait_timeout=120.0, *, title=None, on_progress=None) |
str, str | Path, str | None, bool, float, *, str | None, Callable | None |
Source |
Upload file. mime_type is deprecated and ignored (server infers from filename); passing non-None emits DeprecationWarning and is scheduled for removal in v0.6.0. title (keyword-only) sets the display name via a post-upload UPDATE_SOURCE and forces a brief registration wait even when wait=False. on_progress(bytes_sent, total_bytes) may be sync or async. |
add_drive(notebook_id, file_id, title, mime_type) |
str, str, str, str |
Source |
Add Google Drive doc |
rename(notebook_id, source_id, new_title) |
str, str, str |
Source |
Rename source |
refresh(notebook_id, source_id) |
str, str |
bool |
Refresh URL/Drive source |
check_freshness(notebook_id, source_id) |
str, str |
bool |
Check if source needs refresh |
delete(notebook_id, source_id) |
str, str |
bool |
Delete source |
wait_until_ready(notebook_id, source_id, timeout=120.0, ...) |
str, str, float, ... |
Source |
Poll until status == READY (fully processed). Raises SourceTimeoutError/SourceProcessingError/SourceNotFoundError. |
wait_until_registered(notebook_id, source_id, timeout=30.0, ...) |
str, str, float, ... |
Source |
Poll until the source is visible server-side (any non-ERROR status). Completes quickly (seconds for typical sources); intended for narrow follow-up RPCs (e.g. UPDATE_SOURCE) that only require registration, not full processing. |
wait_for_sources(notebook_id, source_ids, timeout=120.0, **kwargs) |
str, list[str], float, ... |
list[Source] |
Wait for multiple sources to become ready in parallel. Per-source timeout; **kwargs are forwarded to wait_until_ready. |
Example:
from pathlib import Path
# Add various source types
await client.sources.add_url(nb_id, "https://example.com/article")
await client.sources.add_url(nb_id, "https://youtube.com/watch?v=...") # YouTube URLs autodetected
await client.sources.add_text(nb_id, "My Notes", "Content here...")
await client.sources.add_file(nb_id, Path("./document.pdf"))
# Upload a file with a custom display title (rename happens after upload via
# UPDATE_SOURCE — a brief registration wait runs even when wait=False so the
# rename can land). The mime_type kwarg is deprecated and scheduled for
# removal in v0.6.0; the server infers
# MIME type from the filename extension.
await client.sources.add_file(nb_id, Path("./document.pdf"), title="Q4 Strategy Memo")
# Wait for several uploads to finish processing in parallel
ids = [
(await client.sources.add_url(nb_id, "https://example.com/a")).id,
(await client.sources.add_url(nb_id, "https://example.com/b")).id,
]
ready = await client.sources.wait_for_sources(nb_id, ids, timeout=180)
# Narrow wait: only block until the source is visible server-side (not fully
# processed). Use this before follow-up RPCs like UPDATE_SOURCE.
registered = await client.sources.wait_until_registered(nb_id, ids[0])
# List and manage
sources = await client.sources.list(nb_id)
for src in sources:
print(f"{src.id}: {src.title} ({src.kind})")
await client.sources.rename(nb_id, src.id, "Better Title")
await client.sources.refresh(nb_id, src.id) # Re-fetch URL content
# Check if a source needs refreshing (content changed)
is_fresh = await client.sources.check_freshness(nb_id, src.id)
if not is_fresh:
await client.sources.refresh(nb_id, src.id)
# Get full indexed content (what NotebookLM uses for answers)
fulltext = await client.sources.get_fulltext(nb_id, src.id)
print(f"Content ({fulltext.char_count} chars): {fulltext.content[:500]}...")
# Get AI-generated summary and keywords
guide = await client.sources.get_guide(nb_id, src.id)
print(f"Summary: {guide['summary']}")
print(f"Keywords: {guide['keywords']}")CLI equivalent: Artifact Commands — notebooklm artifact list, get, rename, delete, export, poll, wait. Generation methods map to Generate Commands (notebooklm generate <type>); download methods map to Download Commands (notebooklm download <type>).
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id, type=None) |
str, int |
list[Artifact] |
List artifacts |
get(notebook_id, artifact_id) |
str, str |
Artifact | None |
Get artifact details (returns None if not found) |
delete(notebook_id, artifact_id) |
str, str |
bool |
Delete artifact |
rename(notebook_id, artifact_id, new_title) |
str, str, str |
None |
Rename artifact |
poll_status(notebook_id, task_id) |
str, str |
GenerationStatus |
Check generation status |
wait_for_completion(notebook_id, task_id, ...) |
str, str, ... |
GenerationStatus |
Wait for generation. Pass on_status_change(status) for sync or async progress callbacks. |
CLI equivalent: notebooklm artifact list --type <audio|video|slide-deck|quiz|flashcard|infographic|data-table|mind-map|report> (see Artifact Commands).
| Method | Parameters | Returns | Description |
|---|---|---|---|
list_audio(notebook_id) |
str |
list[Artifact] |
List audio overview artifacts |
list_video(notebook_id) |
str |
list[Artifact] |
List video overview artifacts |
list_reports(notebook_id) |
str |
list[Artifact] |
List report artifacts (Briefing Doc, Study Guide, Blog Post) |
list_quizzes(notebook_id) |
str |
list[Artifact] |
List quiz artifacts |
list_flashcards(notebook_id) |
str |
list[Artifact] |
List flashcard artifacts |
list_infographics(notebook_id) |
str |
list[Artifact] |
List infographic artifacts |
list_slide_decks(notebook_id) |
str |
list[Artifact] |
List slide deck artifacts |
list_data_tables(notebook_id) |
str |
list[Artifact] |
List data table artifacts |
CLI equivalent: Generate Commands — notebooklm generate audio, video, slide-deck, quiz, flashcards, infographic, data-table, mind-map, report.
| Method | Parameters | Returns | Description |
|---|---|---|---|
generate_audio(...) |
See below | GenerationStatus |
Generate podcast |
generate_video(...) |
See below | GenerationStatus |
Generate video |
generate_report(...) |
See below | GenerationStatus |
Generate report |
generate_quiz(...) |
See below | GenerationStatus |
Generate quiz |
generate_flashcards(...) |
See below | GenerationStatus |
Generate flashcards |
generate_slide_deck(...) |
See below | GenerationStatus |
Generate slide deck |
generate_infographic(...) |
See below | GenerationStatus |
Generate infographic |
generate_data_table(...) |
See below | GenerationStatus |
Generate data table |
generate_mind_map(...) |
See below | dict |
Generate mind map |
CLI equivalent: Download Commands — notebooklm download audio, video, slide-deck, infographic, report, mind-map, data-table, quiz, flashcards.
| Method | Parameters | Returns | Description |
|---|---|---|---|
download_audio(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download audio to file (MP4/MP3) |
download_video(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download video to file (MP4) |
download_infographic(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download infographic to file (PNG) |
download_slide_deck(notebook_id, output_path, artifact_id=None, output_format="pdf") |
str, str, str, str |
str |
Download slide deck as PDF or PPTX (output_format: "pdf" or "pptx") |
download_report(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download report as Markdown (.md) |
download_mind_map(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download mind map as JSON (.json) |
download_data_table(notebook_id, output_path, artifact_id=None) |
str, str, str |
str |
Download data table as CSV (.csv) |
download_quiz(notebook_id, output_path, artifact_id=None, output_format="json") |
str, str, str, str |
str |
Download quiz (json/markdown/html) |
download_flashcards(notebook_id, output_path, artifact_id=None, output_format="json") |
str, str, str, str |
str |
Download flashcards (json/markdown/html) |
Download Methods:
# Download the most recent completed audio overview
path = await client.artifacts.download_audio(nb_id, "podcast.mp4")
# Download a specific audio artifact by ID
path = await client.artifacts.download_audio(nb_id, "podcast.mp4", artifact_id="abc123")
# Download video overview
path = await client.artifacts.download_video(nb_id, "video.mp4")
# Download infographic
path = await client.artifacts.download_infographic(nb_id, "infographic.png")
# Download slide deck as PDF
path = await client.artifacts.download_slide_deck(nb_id, "./slides.pdf")
# Returns: "./slides.pdf"
# Download report as Markdown
path = await client.artifacts.download_report(nb_id, "./study-guide.md")
# Extracts markdown content from Briefing Doc, Study Guide, Blog Post, etc.
# Download mind map as JSON
path = await client.artifacts.download_mind_map(nb_id, "./concept-map.json")
# JSON structure: {"name": "Topic", "children": [{"name": "Subtopic", ...}]}
# Download data table as CSV
path = await client.artifacts.download_data_table(nb_id, "./data.csv")
# CSV uses UTF-8 with BOM encoding for Excel compatibility
# Download quiz as JSON (default)
path = await client.artifacts.download_quiz(nb_id, "quiz.json")
# Download quiz as markdown with answers marked
path = await client.artifacts.download_quiz(nb_id, "quiz.md", output_format="markdown")
# Download flashcards as JSON (normalizes f/b to front/back)
path = await client.artifacts.download_flashcards(nb_id, "cards.json")
# Download flashcards as markdown
path = await client.artifacts.download_flashcards(nb_id, "cards.md", output_format="markdown")Notes:
- If
artifact_idis not specified, downloads the first completed artifact of that type - Raises
ValueErrorif no completed artifact is found - Some URLs require browser-based download (handled automatically)
- Report downloads extract the markdown content from the artifact
- Mind map downloads return a JSON tree structure with
nameandchildrenfields - Data table downloads parse the complex rich-text format into CSV rows/columns
- Quiz/flashcard formats:
json(structured),markdown(readable),html(raw) - Downloads automatically use the storage path from
from_storage(path=...)or the resolved profile for cookie authentication
Export artifacts to Google Docs or Google Sheets.
CLI equivalent: notebooklm artifact export <id> --title TEXT --type [docs|sheets] (see Artifact Commands).
| Method | Parameters | Returns | Description |
|---|---|---|---|
export_report(notebook_id, artifact_id, title="Export", export_type=ExportType.DOCS) |
str, str, str, ExportType |
Any |
Export report to Google Docs/Sheets |
export_data_table(notebook_id, artifact_id, title="Export") |
str, str, str |
Any |
Export data table to Google Sheets |
export(notebook_id, artifact_id=None, content=None, title="Export", export_type=ExportType.DOCS) |
str, str | None, str | None, str, ExportType |
Any |
Generic export to Docs/Sheets. All trailing parameters are optional with defaults; pass content=... to export inline content without a pre-existing artifact. |
Export Types (ExportType enum):
ExportType.DOCS(1): Export to Google DocsExportType.SHEETS(2): Export to Google Sheets
from notebooklm import ExportType
# Export a report to Google Docs
result = await client.artifacts.export_report(
nb_id,
artifact_id="report_123",
title="My Briefing Doc",
export_type=ExportType.DOCS
)
# result contains the Google Docs URL
# Export a data table to Google Sheets
result = await client.artifacts.export_data_table(
nb_id,
artifact_id="table_456",
title="Research Data"
)
# result contains the Google Sheets URL
# Generic export (e.g., export any artifact to Sheets). All trailing
# parameters have defaults: `artifact_id=None`, `content=None`,
# `title="Export"`, `export_type=ExportType.DOCS`. Supply `content=...`
# instead of `artifact_id=...` to export inline text without a pre-existing
# artifact.
result = await client.artifacts.export(
nb_id,
artifact_id="artifact_789",
title="Exported Content",
export_type=ExportType.SHEETS
)Generation Methods:
from notebooklm import (
AudioFormat,
AudioLength,
VideoFormat,
VideoStyle,
ReportFormat,
QuizQuantity,
QuizDifficulty,
)
# Audio (podcast)
status = await client.artifacts.generate_audio(
notebook_id,
source_ids=None, # List of source IDs (None = all)
instructions="...", # Custom instructions
audio_format=AudioFormat.DEEP_DIVE, # DEEP_DIVE, BRIEF, CRITIQUE, DEBATE
audio_length=AudioLength.DEFAULT, # SHORT, DEFAULT, LONG
language="en"
)
# Video
status = await client.artifacts.generate_video(
notebook_id,
source_ids=None,
instructions="...",
video_format=VideoFormat.EXPLAINER, # EXPLAINER, BRIEF, CINEMATIC
video_style=VideoStyle.AUTO_SELECT, # AUTO_SELECT, CLASSIC, WHITEBOARD, KAWAII, ANIME, etc.
language="en"
)
# Report
status = await client.artifacts.generate_report(
notebook_id,
report_format=ReportFormat.STUDY_GUIDE, # BRIEFING_DOC, STUDY_GUIDE, BLOG_POST, CUSTOM
source_ids=None,
language="en",
custom_prompt=None, # Used with ReportFormat.CUSTOM
extra_instructions="..." # Optional append for built-in formats
)
# Quiz
status = await client.artifacts.generate_quiz(
notebook_id,
source_ids=None,
instructions="...",
quantity=QuizQuantity.MORE, # FEWER, STANDARD, MORE (MORE aliases STANDARD)
difficulty=QuizDifficulty.MEDIUM, # EASY, MEDIUM, HARD
)Waiting for Completion:
# Start generation
status = await client.artifacts.generate_audio(nb_id)
# Wait with polling
final = await client.artifacts.wait_for_completion(
nb_id,
status.task_id,
timeout=300, # Max wait time in seconds
initial_interval=5 # Initial seconds between polls
)
if final.is_complete:
path = await client.artifacts.download_audio(nb_id, "podcast.mp3")
print(f"Saved to: {path}")
else:
print(f"Failed or timed out: {final.status}")CLI equivalent: Chat Commands — notebooklm ask, configure, history.
| Method | Parameters | Returns | Description |
|---|---|---|---|
ask(notebook_id, question, ...) |
str, str, ... |
AskResult |
Ask a question |
configure(notebook_id, ...) |
str, ... |
None |
Set chat persona |
get_history(notebook_id, limit=100, conversation_id=None) |
str, int, str |
list[tuple[str, str]] |
Get Q&A pairs from most recent conversation |
get_conversation_id(notebook_id) |
str |
str | None |
Get most recent conversation ID from server |
delete_conversation(notebook_id, conversation_id) |
str, str |
bool |
DESTRUCTIVE. Permanently delete a server-side conversation (web UI's "Delete history" action). The next ask() with no conversation_id then starts a brand-new conversation. |
ask() Parameters:
async def ask(
notebook_id: str,
question: str,
source_ids: list[str] | None = None, # Limit to specific sources (None = all)
conversation_id: str | None = None, # Continue existing conversation
) -> AskResultConversation semantics (issue #659):
conversation_id=Nonematches the web UI's default: the server attaches the question to your current conversation on this notebook (or creates one if none exists). Repeatedask()calls withoutconversation_idextend the same conversation; they do not start fresh ones. The SDK fetches the server-recorded conversation_id viahPTbtcafter each new-conversation ask and surfaces it onAskResult.conversation_id, so passing it back asconversation_id=for follow-ups works as expected.conversation_id=<existing-id>is a follow-up: the question is appended to the named conversation.- To force a brand-new conversation, call
client.chat.delete_conversation(notebook_id, last_conversation_id)first — the server then has nothing to extend and the next null-convask()starts a fresh thread. This is destructive: deleted turns are not recoverable. The method mirrors the web UI's "Delete history" button (J7GthcRPC) and is the same primitive the CLI'snotebooklm ask --newis built on.
Example:
from notebooklm import ChatGoal, ChatResponseLength
# Ask questions (uses all sources)
result = await client.chat.ask(nb_id, "What are the main themes?")
print(result.answer)
print(result.conversation_id) # server-recorded id, fetched via hPTbtc
# Access source references (cited in answer as [1], [2], etc.)
for ref in result.references:
print(f"Citation {ref.citation_number}: Source {ref.source_id}")
# Ask using only specific sources
result = await client.chat.ask(
nb_id,
"Summarize the key points",
source_ids=["src_001", "src_002"]
)
# Continue conversation explicitly (or omit conversation_id — same effect
# while the most-recent conversation on the notebook stays unchanged).
result = await client.chat.ask(
nb_id,
"Can you elaborate on the first point?",
conversation_id=result.conversation_id
)
# Force a fresh conversation (destructive — turns are not recoverable).
# Mirrors the web UI's "Delete history" button.
last_conv_id = await client.chat.get_conversation_id(nb_id)
if last_conv_id:
await client.chat.delete_conversation(nb_id, last_conv_id)
result = await client.chat.ask(nb_id, "Start fresh — what are the themes?")
assert result.turn_number == 1
# Configure persona
await client.chat.configure(
nb_id,
goal=ChatGoal.LEARNING_GUIDE,
response_length=ChatResponseLength.LONGER,
custom_prompt="Focus on practical applications"
)CLI equivalent: Research Commands (notebooklm research status, wait) plus notebooklm source add-research (Source: add-research) for the combined start-and-import workflow.
| Method | Parameters | Returns | Description |
|---|---|---|---|
start(notebook_id, query, source, mode) |
str, str, str="web", str="fast" |
dict | None |
Start research (mode: "fast" or "deep"); raises ValidationError on invalid source/mode |
poll(notebook_id) |
str |
dict |
Check research status |
import_sources(notebook_id, task_id, sources) |
str, str, list |
list[dict] |
Import findings |
Method Signatures:
async def start(
notebook_id: str,
query: str,
source: str = "web", # "web" or "drive"
mode: str = "fast", # "fast" or "deep" (deep only for web)
) -> dict | None:
"""
Returns: {"task_id": str, "report_id": str, "notebook_id": str, "query": str, "mode": str},
or None if the RPC returns an empty/unexpected payload
Raises: ValidationError if source/mode combination is invalid
"""
async def poll(notebook_id: str) -> dict:
"""
Returns a dict for the LATEST research task. Top-level keys:
- task_id: str — task/report identifier
- status: str — "completed" | "in_progress" | "no_research"
- query: str — original research query
- sources: list[dict]
- summary: str — summary text when present
- report: str — deep-research report markdown when present
- tasks: list[dict] — ALL parsed tasks (same shape as the top-level
latest-task fields), additive across polls
Each source dict may include:
- url, title
- result_type: int — 1=web, 2=drive, 5=deep-research report entry
- research_task_id: str — task/report ID that produced this source
- report_markdown: str — deep-research report markdown (for type-5 entries)
"""
async def import_sources(notebook_id: str, task_id: str, sources: list[dict]) -> list[dict]:
"""
sources: list of dicts with 'url' and 'title' keys. Deep-research entries
from poll() may also include 'report_markdown', 'result_type', and
'research_task_id'.
Returns: list of imported sources with 'id' and 'title'.
Raises:
- ValidationError if `sources` contains entries from more than one
research task (`research_task_id` mismatch). Import each task's
sources in a separate call.
Caveats:
- The API response can under-report — fewer items may come back than
were actually imported. After this call, re-list with
`client.sources.list(notebook_id)` to verify the final source set.
- Entries without a `url` and without a complete report (`title` +
`report_markdown` + `result_type == 5`) are skipped with a warning.
"""Example:
# Start fast web research (default)
result = await client.research.start(nb_id, "AI safety regulations")
if result is None:
raise RuntimeError("Research start returned None")
task_id = result["task_id"]
# Start deep web research
result = await client.research.start(nb_id, "quantum computing", source="web", mode="deep")
if result is None:
raise RuntimeError("Research start returned None")
task_id = result["task_id"]
# Start fast Drive research
result = await client.research.start(nb_id, "project docs", source="drive", mode="fast")
if result is None:
raise RuntimeError("Research start returned None")
# Poll until complete
import asyncio
while True:
status = await client.research.poll(nb_id)
if status["status"] == "completed":
break
await asyncio.sleep(10)
# Import discovered sources
imported = await client.research.import_sources(nb_id, task_id, status["sources"][:5])
print(f"Imported {len(imported)} sources")CLI equivalent: Note Commands — notebooklm note list, create, get, save, rename, delete.
| Method | Parameters | Returns | Description |
|---|---|---|---|
list(notebook_id) |
str |
list[Note] |
List text notes (excludes mind maps) |
create(notebook_id, title="New Note", content="") |
str, str, str |
Note |
Create plain-text note (no citation anchors) |
create_from_chat(notebook_id, ask_result, *, title=None) |
str, AskResult, str | None |
Note |
Save a chat answer as a citation-rich note (issue #660) — the resulting note's [N] markers remain interactive hover-anchored citations in the NotebookLM web UI. Raises ValueError if ask_result.references is empty. |
get(notebook_id, note_id) |
str, str |
Optional[Note] |
Get note by ID |
update(notebook_id, note_id, content, title) |
str, str, str, str |
None |
Update note content and title |
delete(notebook_id, note_id) |
str, str |
bool |
Delete note |
list_mind_maps(notebook_id) |
str |
list[Any] |
List mind maps in the notebook |
delete_mind_map(notebook_id, mind_map_id) |
str, str |
bool |
Delete a mind map |
Example:
# Create and manage plain-text notes
note = await client.notes.create(nb_id, title="Meeting Notes", content="Discussion points...")
notes = await client.notes.list(nb_id)
# Update a note
await client.notes.update(nb_id, note.id, "Updated content", "New Title")
# Delete a note
await client.notes.delete(nb_id, note.id)
# Save a chat answer as a citation-rich note (preserves [N] hover links)
result = await client.chat.ask(nb_id, "What fruits are mentioned?")
if result.references:
note = await client.notes.create_from_chat(nb_id, result, title="Fruit Citations")
# Note: the NotebookLM server may auto-generate a "smart" title for
# citation-rich notes; note.title reflects what the server stored.Mind Maps:
Mind maps are stored internally using the same structure as notes but contain JSON data with hierarchical node information. The list() method excludes mind maps automatically, while list_mind_maps() returns only mind maps.
# List all mind maps in a notebook
mind_maps = await client.notes.list_mind_maps(nb_id)
for mm in mind_maps:
mm_id = mm[0] # Mind map ID is at index 0
print(f"Mind map: {mm_id}")
# Delete a mind map
await client.notes.delete_mind_map(nb_id, mind_map_id)Note: Mind maps are detected by checking if the content contains '"children":' or '"nodes":'` keys, which indicate JSON mind map data structure.
CLI equivalent: Language Commands — notebooklm language get, set, list. Account limits and tier do not yet have a dedicated CLI surface; use notebooklm status for context.
| Method | Parameters | Returns | Description |
|---|---|---|---|
get_output_language() |
none | Optional[str] |
Get current output language setting |
get_account_limits() |
none | AccountLimits |
Get account-level limits such as max notebooks and sources per notebook |
get_account_tier() |
none | AccountTier |
Get current NotebookLM subscription tier |
set_output_language(language) |
str |
Optional[str] |
Set output language for artifact generation |
Example:
# Get current language setting
lang = await client.settings.get_output_language()
print(f"Current language: {lang}") # e.g., "en", "ja", "zh_Hans"
# Get server-reported account limits
limits = await client.settings.get_account_limits()
print(f"Notebook limit: {limits.notebook_limit}")
# Get current NotebookLM subscription tier
tier = await client.settings.get_account_tier()
print(f"Account tier: {tier.plan_name or tier.tier}")
# Set language for artifact generation
result = await client.settings.set_output_language("ja") # Japanese
print(f"Language set to: {result}")Important: Language is a GLOBAL setting that affects all notebooks in your account. The tier string is internal NotebookLM metadata; use get_account_limits() for quota decisions because the raw tier name may not match the active notebook/source limits. Supported languages include:
en(English),ja(日本語),zh_Hans(中文简体),zh_Hant(中文繁體)ko(한국어),es(Español),fr(Français),de(Deutsch),pt_BR(Português)- And over 70 other languages
CLI equivalent: Share Commands — notebooklm share status, public, view-level, add, update, remove.
| Method | Parameters | Returns | Description |
|---|---|---|---|
get_status(notebook_id) |
str |
ShareStatus |
Get current sharing configuration |
set_public(notebook_id, public) |
str, bool |
ShareStatus |
Enable/disable public link sharing |
set_view_level(notebook_id, level) |
str, ShareViewLevel |
ShareStatus |
Set what viewers can access |
add_user(notebook_id, email, permission, notify, welcome_message) |
str, str, SharePermission, bool, str |
ShareStatus |
Share with a user |
update_user(notebook_id, email, permission) |
str, str, SharePermission |
ShareStatus |
Update user's permission |
remove_user(notebook_id, email) |
str, str |
ShareStatus |
Remove user's access |
Example:
from notebooklm import SharePermission, ShareViewLevel
# Get current sharing status
status = await client.sharing.get_status(notebook_id)
print(f"Public: {status.is_public}")
print(f"Users: {[u.email for u in status.shared_users]}")
# Enable public sharing (anyone with link)
status = await client.sharing.set_public(notebook_id, True)
print(f"Share URL: {status.share_url}")
# Set view level (what viewers can access)
await client.sharing.set_view_level(notebook_id, ShareViewLevel.CHAT_ONLY)
# Share with specific users
status = await client.sharing.add_user(
notebook_id,
"colleague@example.com",
SharePermission.VIEWER,
notify=True,
welcome_message="Check out my research!"
)
# Update user permission
status = await client.sharing.update_user(
notebook_id,
"colleague@example.com",
SharePermission.EDITOR
)
# Remove user access
status = await client.sharing.remove_user(notebook_id, "colleague@example.com")
# Disable public sharing
status = await client.sharing.set_public(notebook_id, False)Permission Levels:
SharePermission.OWNER- Full control (read-only, cannot be assigned)SharePermission.EDITOR- Can edit notebook contentSharePermission.VIEWER- Read-only access
View Levels:
ShareViewLevel.FULL_NOTEBOOK- Viewers can access chat, sources, and notesShareViewLevel.CHAT_ONLY- Viewers can only access the chat interface
@dataclass
class Notebook:
id: str
title: str
created_at: Optional[datetime]
sources_count: int
is_owner: bool@dataclass
class Source:
id: str
title: Optional[str]
url: Optional[str]
created_at: Optional[datetime]
status: int # 1=processing, 2=ready, 3=error, 5=preparing (defaults to READY)
@property
def kind(self) -> SourceType:
"""Get source type as SourceType enum."""
@property
def is_ready(self) -> bool:
"""status == SourceStatus.READY"""
@property
def is_processing(self) -> bool:
"""status == SourceStatus.PROCESSING"""
@property
def is_error(self) -> bool:
"""status == SourceStatus.ERROR"""Removed in v0.5.0:
Source.source_typewas replaced bySource.kind. See stability.md → Removed in v0.5.0.
Type Identification:
Use the .kind property to identify source types. It returns a SourceType enum which is also a str, enabling both enum and string comparisons:
from notebooklm import SourceType
# Enum comparison (recommended)
if source.kind == SourceType.PDF:
print("This is a PDF")
# String comparison (also works)
if source.kind == "pdf":
print("This is a PDF")
# Use in f-strings
print(f"Type: {source.kind}") # "Type: pdf"@dataclass
class Artifact:
id: str
title: str
_artifact_type: int # Internal type code; field order matters. Access via .kind.
status: int # 1=processing, 2=pending, 3=completed, 4=failed
created_at: Optional[datetime]
url: Optional[str]
_variant: int | None = None # Internal variant for type-4 artifacts (1=flashcards, 2=quiz).
@property
def kind(self) -> ArtifactType:
"""Get artifact type as ArtifactType enum."""
@property
def is_completed(self) -> bool:
"""Check if artifact generation is complete."""
@property
def is_quiz(self) -> bool:
"""Check if this is a quiz artifact."""
@property
def is_flashcards(self) -> bool:
"""Check if this is a flashcards artifact."""
@property
def report_subtype(self) -> str | None:
"""Title-derived report subtype: 'briefing_doc', 'study_guide',
'blog_post', or 'report' for type-2 artifacts; None otherwise.
Use this instead of parsing titles in caller code.
"""Note on _artifact_type / _variant: these are private (leading-underscore) fields with repr=False and are part of the dataclass for from_api_response() round-tripping. Always consume them via the public .kind, .is_quiz, .is_flashcards, and .report_subtype accessors.
Removed in v0.5.0:
Artifact.artifact_typeandArtifact.variantwere replaced byArtifact.kindplus.is_quiz/.is_flashcards. See stability.md → Removed in v0.5.0.
Type Identification:
Use the .kind property to identify artifact types. It returns an ArtifactType enum which is also a str:
from notebooklm import ArtifactType
# Enum comparison (recommended)
if artifact.kind == ArtifactType.AUDIO:
print("This is an audio overview")
# String comparison (also works)
if artifact.kind == "audio":
print("This is an audio overview")
# Check specific types
if artifact.is_quiz:
print("This is a quiz")
elif artifact.is_flashcards:
print("This is a flashcard deck")Returned by poll_status, wait_for_completion, and most artifact generation methods (generate_audio, generate_video, generate_report, generate_quiz, generate_flashcards, generate_slide_deck, generate_infographic, generate_data_table). Note that generate_mind_map returns a dict[str, Any] instead — the mind map is delivered as JSON inline rather than polled.
@dataclass
class GenerationStatus:
task_id: str # Same value as Artifact.id once complete
status: str # "pending" | "in_progress" | "completed" | "failed" | "not_found"
url: str | None = None # Populated for media artifacts when status == "completed"
error: str | None = None
error_code: str | None = None # e.g. "USER_DISPLAYABLE_ERROR" for rate limits
metadata: dict[str, Any] | None = None
@property
def is_complete(self) -> bool:
"""Check if generation is complete."""
@property
def is_failed(self) -> bool:
"""Check if generation failed."""
@property
def is_pending(self) -> bool:
"""Check if generation is pending."""
@property
def is_not_found(self) -> bool:
"""Check if the artifact is absent from the poll response.
Distinct from ``is_pending``: a *pending* artifact exists in the
artifact list and is queued, while *not_found* means the artifact
has either not yet appeared (brief lag after creation) or was
silently removed server-side (e.g. after a daily-quota rejection).
``wait_for_completion`` treats a sustained run of ``not_found``
responses as a failure — see its ``max_not_found`` parameter.
"""
@property
def is_rate_limited(self) -> bool:
"""Check if generation failed due to rate limiting."""url semantics: poll_status populates url for media artifact types (audio, video, infographic, slide-deck PDF) as soon as the server reports the asset as ready. Slide decks expose the PDF URL here; for the editable PowerPoint, use client.artifacts.download_slide_deck(..., output_format="pptx") instead.
status = await client.artifacts.generate_audio(notebook_id)
final = await client.artifacts.wait_for_completion(notebook_id, status.task_id)
if final.is_complete and final.url:
# Stream the asset directly instead of re-fetching artifact metadata
...@dataclass
class AskResult:
answer: str # The answer text with inline citations [1], [2], etc.
conversation_id: str # ID for follow-up questions
turn_number: int # Turn number in conversation
is_follow_up: bool # Whether this was a follow-up question
references: list[ChatReference] # Source references cited in the answer
raw_response: str # First 1000 chars of raw API response
@dataclass
class ChatReference:
source_id: str # UUID of the source
citation_number: int | None # Citation number in answer (1, 2, etc.)
cited_text: str | None # Actual text passage being cited
start_char: int | None # Start position in source content
end_char: int | None # End position in source content
chunk_id: str | None # Internal chunk ID (for debugging)Important: The cited_text field often contains only a snippet or section header, not the full quoted passage. The start_char/end_char positions reference NotebookLM's internal chunked index, which does not directly correspond to positions in the raw fulltext returned by get_fulltext().
Use SourceFulltext.find_citation_context() to locate citations in the fulltext:
fulltext = await client.sources.get_fulltext(notebook_id, ref.source_id)
matches = fulltext.find_citation_context(ref.cited_text) # Returns list[(context, position)]
if matches:
context, pos = matches[0] # First match
if len(matches) > 1:
print(f"Warning: {len(matches)} matches found, using first")
else:
context = None # Not found - may occur if source was modifiedTip: Cache fulltext when processing multiple citations from the same source to avoid repeated API calls.
@dataclass
class ShareStatus:
notebook_id: str # The notebook ID
is_public: bool # Whether publicly accessible
access: ShareAccess # RESTRICTED or ANYONE_WITH_LINK
view_level: ShareViewLevel # FULL_NOTEBOOK or CHAT_ONLY
shared_users: list[SharedUser] # List of users with access
share_url: str | None # Public URL if is_public=True@dataclass
class SharedUser:
email: str # User's email address
permission: SharePermission # OWNER, EDITOR, or VIEWER
display_name: str | None # User's display name
avatar_url: str | None # URL to user's avatar imageReturned by client.settings.get_account_limits(). Use these fields for
quota decisions in preference to the raw tier string — the server-reported
limits are what NotebookLM actually enforces.
@dataclass(frozen=True)
class AccountLimits:
notebook_limit: int | None = None # Max notebooks the account can hold
source_limit: int | None = None # Max sources per notebook
raw_limits: tuple[Any, ...] = () # Untouched RPC payload for forensic useReturned by client.settings.get_account_tier(). Raw tier metadata from
NotebookLM's homepage tier RPC. plan_name is the user-facing label when
available (e.g. "NotebookLM Pro"); tier is the internal identifier
("STANDARD", "PLUS", "PRO", "PRO_DASHER_END_USER", "ULTRA").
@dataclass(frozen=True)
class AccountTier:
tier: str | None = None # Internal tier identifier
plan_name: str | None = None # User-facing plan labeltier = await client.settings.get_account_tier()
label = tier.plan_name or tier.tier or "unknown"@dataclass
class SourceFulltext:
source_id: str # UUID of the source
title: str # Source title
content: str # Full indexed text content
url: str | None # Original URL (if applicable)
char_count: int # Character count
@property
def kind(self) -> SourceType:
"""Get source type as SourceType enum."""
def find_citation_context(
self,
cited_text: str,
context_chars: int = 200,
) -> list[tuple[str, int]]:
"""Search for citation text, return list of (context, position) tuples."""Removed in v0.5.0:
SourceFulltext.source_typewas replaced bySourceFulltext.kind. See stability.md → Removed in v0.5.0.
Type Identification:
Like Source, use the .kind property to get the source type:
fulltext = await client.sources.get_fulltext(nb_id, source_id)
print(f"Content type: {fulltext.kind}") # "pdf", "web_page", etc.class AudioFormat(Enum):
DEEP_DIVE = 1 # In-depth discussion
BRIEF = 2 # Quick summary
CRITIQUE = 3 # Critical analysis
DEBATE = 4 # Two-sided debate
class AudioLength(Enum):
SHORT = 1
DEFAULT = 2
LONG = 3class VideoFormat(Enum):
EXPLAINER = 1
BRIEF = 2
CINEMATIC = 3
class VideoStyle(Enum):
AUTO_SELECT = 1
CUSTOM = 2
CLASSIC = 3
WHITEBOARD = 4
KAWAII = 5
ANIME = 6
WATERCOLOR = 7
RETRO_PRINT = 8
HERITAGE = 9
PAPER_CRAFT = 10class QuizQuantity(Enum):
FEWER = 1
STANDARD = 2
MORE = 2 # Alias of STANDARD used by the CLI/web UI
class QuizDifficulty(Enum):
EASY = 1
MEDIUM = 2
HARD = 3class ReportFormat(str, Enum):
BRIEFING_DOC = "briefing_doc"
STUDY_GUIDE = "study_guide"
BLOG_POST = "blog_post"
CUSTOM = "custom"class InfographicOrientation(Enum):
LANDSCAPE = 1
PORTRAIT = 2
SQUARE = 3
class InfographicDetail(Enum):
CONCISE = 1
STANDARD = 2
DETAILED = 3class SlideDeckFormat(Enum):
DETAILED_DECK = 1
PRESENTER_SLIDES = 2
class SlideDeckLength(Enum):
DEFAULT = 1
SHORT = 2class ExportType(Enum):
DOCS = 1 # Export to Google Docs
SHEETS = 2 # Export to Google Sheetsclass ShareAccess(Enum):
RESTRICTED = 0 # Only explicitly shared users
ANYONE_WITH_LINK = 1 # Public link access
class ShareViewLevel(Enum):
FULL_NOTEBOOK = 0 # Chat + sources + notes
CHAT_ONLY = 1 # Chat interface only
class SharePermission(Enum):
OWNER = 1 # Full control (read-only, cannot assign)
EDITOR = 2 # Can edit notebook
VIEWER = 3 # Read-only accessclass SourceType(str, Enum):
"""Source types - use with source.kind property.
This is a str enum, enabling both enum and string comparisons:
source.kind == SourceType.PDF # True
source.kind == "pdf" # Also True
"""
GOOGLE_DOCS = "google_docs"
GOOGLE_SLIDES = "google_slides"
GOOGLE_SPREADSHEET = "google_spreadsheet"
PDF = "pdf"
PASTED_TEXT = "pasted_text"
WEB_PAGE = "web_page"
GOOGLE_DRIVE_AUDIO = "google_drive_audio"
GOOGLE_DRIVE_VIDEO = "google_drive_video"
YOUTUBE = "youtube"
MARKDOWN = "markdown"
DOCX = "docx"
CSV = "csv"
EPUB = "epub"
IMAGE = "image"
MEDIA = "media"
UNKNOWN = "unknown"
class ArtifactType(str, Enum):
"""Artifact types - use with artifact.kind property.
This is a str enum that hides internal variant complexity.
Quizzes and flashcards are distinguished automatically.
"""
AUDIO = "audio"
VIDEO = "video"
REPORT = "report"
QUIZ = "quiz"
FLASHCARDS = "flashcards"
MIND_MAP = "mind_map"
INFOGRAPHIC = "infographic"
SLIDE_DECK = "slide_deck"
DATA_TABLE = "data_table"
UNKNOWN = "unknown"
class SourceStatus(Enum):
PROCESSING = 1 # Source is being processed (indexing content)
READY = 2 # Source is ready for use
ERROR = 3 # Source processing failed
PREPARING = 5 # Source is being prepared/uploaded (pre-processing stage)Usage Example:
from notebooklm import SourceType, ArtifactType
# List sources by type using .kind property
sources = await client.sources.list(nb_id)
for src in sources:
if src.kind == SourceType.PDF:
print(f"PDF: {src.title}")
elif src.kind == SourceType.MEDIA:
print(f"Audio/Video: {src.title}")
elif src.kind == SourceType.IMAGE:
print(f"Image (OCR'd): {src.title}")
elif src.kind == SourceType.UNKNOWN:
print(f"Unknown type: {src.title}")
# List artifacts by type using .kind property
artifacts = await client.artifacts.list(nb_id)
for art in artifacts:
if art.kind == ArtifactType.AUDIO:
print(f"Audio: {art.title}")
elif art.kind == ArtifactType.VIDEO:
print(f"Video: {art.title}")
elif art.kind == ArtifactType.QUIZ:
print(f"Quiz: {art.title}")class ChatGoal(Enum):
DEFAULT = 1 # General purpose
CUSTOM = 2 # Uses custom_prompt
LEARNING_GUIDE = 3 # Educational focus
class ChatResponseLength(Enum):
DEFAULT = 1
LONGER = 4
SHORTER = 5
class ChatMode(Enum):
"""Predefined chat modes for common use cases (service-level enum)."""
DEFAULT = "default" # General purpose
LEARNING_GUIDE = "learning_guide" # Educational focus
CONCISE = "concise" # Brief responses
DETAILED = "detailed" # Verbose responsesChatGoal vs ChatMode:
ChatGoalis an RPC-level enum used withclient.chat.configure()for low-level API configurationChatModeis a service-level enum providing predefined configurations for common use cases
For undocumented features, you can make raw RPC calls:
from notebooklm.rpc import RPCMethod
async with await NotebookLMClient.from_storage() as client:
# Each RPCMethod member has its own params shape (a nested list) and
# source_path; mirror the higher-level APIs when in doubt.
result = await client.rpc_call(
RPCMethod.CREATE_NOTEBOOK,
params=["My Notebook", None, None, [2], [1]],
)Google rate limits aggressive API usage:
import asyncio
from notebooklm import RPCError
async def safe_create_notebooks(client, titles):
for title in titles:
try:
await client.notebooks.create(title)
except RPCError:
# Wait and retry on rate limit
await asyncio.sleep(10)
await client.notebooks.create(title)
# Add delay between operations
await asyncio.sleep(2)The chat endpoint supports streaming (internal implementation):
# Standard (non-streaming) - recommended
result = await client.chat.ask(nb_id, "Question")
print(result.answer)
# Streaming is handled internally by the library
# The ask() method returns the complete response