Skip to content

Commit e6490f2

Browse files
authored
refactor: createAsyncIteratorObject -> AsyncIteratorClass for endability (#619)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Introduced a dedicated class to manage async iteration, enhancing encapsulation and maintainability. - Updated multiple modules to utilize the new async iterator class for handling event streams and iterators. - Modified function signatures to return the new async iterator class type. - **Tests** - Revised test suites to accommodate the new async iterator class. - Removed a test concerning a deprecated async disposal symbol fallback. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 55474d7 commit e6490f2

7 files changed

Lines changed: 90 additions & 108 deletions

File tree

packages/client/src/event-iterator.test.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,7 @@ describe('mapEventIterator', () => {
152152
})
153153

154154
await mapped.next()
155-
const error = new Error('TEST')
156-
await expect(mapped.throw(new Error('TEST'))).rejects.toEqual({ mapped: error })
157-
158-
expect(map).toHaveBeenCalledTimes(2)
159-
155+
await expect(mapped.throw(new Error('TEST'))).rejects.toThrow()
160156
expect(finished).toBe(true)
161157
})
162158
})
Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isTypescriptObject } from '@orpc/shared'
1+
import { AsyncIteratorClass, isTypescriptObject } from '@orpc/shared'
22
import { getEventMeta, withEventMeta } from '@orpc/standard-server'
33

44
export function mapEventIterator<TYield, TReturn, TNext, TMap = TYield | TReturn>(
@@ -7,27 +7,21 @@ export function mapEventIterator<TYield, TReturn, TNext, TMap = TYield | TReturn
77
value: (value: NoInfer<TYield | TReturn>, done: boolean | undefined) => Promise<TMap>
88
error: (error: unknown) => Promise<unknown>
99
},
10-
): AsyncGenerator<TMap, TMap, TNext> {
11-
return (async function* () {
10+
): AsyncIteratorClass<TMap, TMap, TNext> {
11+
return new AsyncIteratorClass(async () => {
1212
try {
13-
while (true) {
14-
const { done, value } = await iterator.next()
13+
const { done, value } = await iterator.next()
1514

16-
let mappedValue = await maps.value(value, done)
15+
let mappedValue = await maps.value(value, done)
1716

18-
if (mappedValue !== value) {
19-
const meta = getEventMeta(value)
20-
if (meta && isTypescriptObject(mappedValue)) {
21-
mappedValue = withEventMeta(mappedValue, meta)
22-
}
17+
if (mappedValue !== value) {
18+
const meta = getEventMeta(value)
19+
if (meta && isTypescriptObject(mappedValue)) {
20+
mappedValue = withEventMeta(mappedValue, meta)
2321
}
24-
25-
if (done) {
26-
return mappedValue
27-
}
28-
29-
yield mappedValue
3022
}
23+
24+
return { done, value: mappedValue }
3125
}
3226
catch (error) {
3327
let mappedError = await maps.error(error)
@@ -41,8 +35,9 @@ export function mapEventIterator<TYield, TReturn, TNext, TMap = TYield | TReturn
4135

4236
throw mappedError
4337
}
44-
finally {
38+
}, async (reason) => {
39+
if (reason !== 'next') {
4540
await iterator.return?.()
4641
}
47-
})()
42+
})
4843
}

packages/shared/src/event-publisher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { createAsyncIteratorObject } from './iterator'
1+
import { AsyncIteratorClass } from './iterator'
22

33
export interface EventPublisherOptions {
44
/**
@@ -130,7 +130,7 @@ export class EventPublisher<T extends Record<PropertyKey, any>> {
130130

131131
signal?.addEventListener('abort', abortListener, { once: true })
132132

133-
return createAsyncIteratorObject(async () => {
133+
return new AsyncIteratorClass(async () => {
134134
if (signal?.aborted) {
135135
throw signal.reason
136136
}

packages/shared/src/iterator.test.ts

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { createAsyncIteratorObject, isAsyncIteratorObject, replicateAsyncIterator } from './iterator'
1+
import { AsyncIteratorClass, isAsyncIteratorObject, replicateAsyncIterator } from './iterator'
22

33
it('isAsyncIteratorObject', () => {
44
expect(isAsyncIteratorObject(null)).toBe(false)
@@ -15,15 +15,15 @@ it('isAsyncIteratorObject', () => {
1515
expect(isAsyncIteratorObject(gen2())).toBe(false)
1616
})
1717

18-
describe('createAsyncIteratorObject', () => {
18+
describe('asyncIteratorClass', () => {
1919
const next = vi.fn()
2020
const cleanup = vi.fn()
2121
let iterator: AsyncGenerator
2222

2323
beforeEach(() => {
2424
next.mockReset()
2525
cleanup.mockReset()
26-
iterator = createAsyncIteratorObject(next, cleanup)
26+
iterator = new AsyncIteratorClass(next, cleanup)
2727
})
2828

2929
afterEach(async () => {
@@ -161,28 +161,6 @@ describe('createAsyncIteratorObject', () => {
161161
await iterator.return(undefined)
162162
})
163163

164-
it('should fallback to random symbol if Symbol.asyncDispose is not available', async () => {
165-
const OriginalSymbol = globalThis.Symbol
166-
const fallbackSymbol = Symbol.for('asyncDispose')
167-
168-
globalThis.Symbol = {
169-
for: (name: string) => {
170-
expect(name).toBe('asyncDispose')
171-
return fallbackSymbol
172-
},
173-
} as any
174-
175-
const fallback = createAsyncIteratorObject(() => {
176-
throw new Error('Should not be called')
177-
}, async () => {})
178-
179-
expect(typeof (fallback as any)[fallbackSymbol]).toBe('function')
180-
181-
globalThis.Symbol = OriginalSymbol
182-
183-
await iterator.return(undefined)
184-
})
185-
186164
it('should call cleanup("dispose") when disposed', async () => {
187165
await Promise.all([
188166
(iterator as any)[Symbol.asyncDispose](),

packages/shared/src/iterator.ts

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,86 +9,99 @@ export function isAsyncIteratorObject(maybe: unknown): maybe is AsyncIteratorObj
99
return Symbol.asyncIterator in maybe && typeof maybe[Symbol.asyncIterator] === 'function'
1010
}
1111

12-
export interface CreateAsyncIteratorObjectCleanupFn {
12+
export interface AsyncIteratorClassNextFn<T, TReturn> {
13+
(): Promise<IteratorResult<T, TReturn>>
14+
}
15+
16+
export interface AsyncIteratorClassCleanupFn {
1317
(reason: 'return' | 'throw' | 'next' | 'dispose'): Promise<void>
1418
}
1519

16-
export function createAsyncIteratorObject<T, TReturn, TNext>(
17-
next: () => Promise<IteratorResult<T, TReturn>>,
18-
cleanup: CreateAsyncIteratorObjectCleanupFn,
19-
): AsyncIteratorObject<T, TReturn, TNext> & AsyncGenerator<T, TReturn, TNext> {
20-
let isExecuteComplete = false
21-
let isDone = false
20+
const fallbackAsyncDisposeSymbol: unique symbol = Symbol.for('asyncDispose')
21+
const asyncDisposeSymbol: typeof Symbol extends { asyncDispose: infer T } ? T : typeof fallbackAsyncDisposeSymbol = (Symbol as any).asyncDispose ?? fallbackAsyncDisposeSymbol
22+
23+
export class AsyncIteratorClass<T, TReturn = unknown, TNext = unknown> implements AsyncIteratorObject<T, TReturn, TNext>, AsyncGenerator<T, TReturn, TNext> {
24+
#isDone = false
25+
#isExecuteComplete = false
26+
#cleanup: AsyncIteratorClassCleanupFn
27+
#next: AsyncIteratorClassNextFn<T, TReturn>
2228

23-
const iterator = {
24-
next: sequential(async () => {
25-
if (isDone) {
29+
constructor(next: AsyncIteratorClassNextFn<T, TReturn>, cleanup: AsyncIteratorClassCleanupFn) {
30+
this.#cleanup = cleanup
31+
this.#next = sequential(async () => {
32+
if (this.#isDone) {
2633
return { done: true, value: undefined as any }
2734
}
2835

2936
try {
3037
const result = await next()
3138

3239
if (result.done) {
33-
isDone = true
40+
this.#isDone = true
3441
}
3542

3643
return result
3744
}
3845
catch (err) {
39-
isDone = true
46+
this.#isDone = true
4047
throw err
4148
}
4249
finally {
43-
if (isDone && !isExecuteComplete) {
44-
isExecuteComplete = true
45-
await cleanup('next')
50+
if (this.#isDone && !this.#isExecuteComplete) {
51+
this.#isExecuteComplete = true
52+
await this.#cleanup('next')
4653
}
4754
}
48-
}),
49-
async return(value: any) {
50-
isDone = true
51-
if (!isExecuteComplete) {
52-
isExecuteComplete = true
53-
await cleanup('return')
54-
}
55+
})
56+
}
5557

56-
return { done: true, value }
57-
},
58-
async throw(err: any) {
59-
isDone = true
60-
if (!isExecuteComplete) {
61-
isExecuteComplete = true
62-
await cleanup('throw')
63-
}
58+
next(): Promise<IteratorResult<T, TReturn>> {
59+
return this.#next()
60+
}
6461

65-
throw err
66-
},
67-
/**
68-
* asyncDispose symbol only available in esnext, we should fallback to Symbol.for('asyncDispose')
69-
*/
70-
async [(Symbol as any).asyncDispose as typeof Symbol extends { asyncDispose: infer T } ? T : any ?? Symbol.for('asyncDispose')]() {
71-
isDone = true
72-
if (!isExecuteComplete) {
73-
isExecuteComplete = true
74-
await cleanup('dispose')
75-
}
76-
},
77-
[Symbol.asyncIterator]() {
78-
return iterator
79-
},
62+
async return(value?: any): Promise<IteratorResult<T, TReturn>> {
63+
this.#isDone = true
64+
if (!this.#isExecuteComplete) {
65+
this.#isExecuteComplete = true
66+
await this.#cleanup('return')
67+
}
68+
69+
return { done: true, value }
70+
}
71+
72+
async throw(err: any): Promise<IteratorResult<T, TReturn>> {
73+
this.#isDone = true
74+
if (!this.#isExecuteComplete) {
75+
this.#isExecuteComplete = true
76+
await this.#cleanup('throw')
77+
}
78+
79+
throw err
8080
}
8181

82-
return iterator
82+
/**
83+
* asyncDispose symbol only available in esnext, we should fallback to Symbol.for('asyncDispose')
84+
*/
85+
async [asyncDisposeSymbol](): Promise<void> {
86+
this.#isDone = true
87+
if (!this.#isExecuteComplete) {
88+
this.#isExecuteComplete = true
89+
await this.#cleanup('dispose')
90+
}
91+
}
92+
93+
[Symbol.asyncIterator](): this {
94+
return this
95+
}
8396
}
8497

8598
export function replicateAsyncIterator<T, TReturn, TNext>(
8699
source: AsyncIterator<T, TReturn, TNext>,
87100
count: number,
88-
): (AsyncIteratorObject<T, TReturn, TNext> & AsyncGenerator<T, TReturn, TNext>)[] {
101+
): (AsyncIteratorClass<T, TReturn, TNext>)[] {
89102
const queue = new AsyncIdQueue<IteratorResult<T, TReturn>>()
90103

91-
const replicated: (AsyncIteratorObject<T, TReturn, TNext> & AsyncGenerator<T, TReturn, TNext>)[] = []
104+
const replicated: AsyncIteratorClass<T, TReturn, TNext>[] = []
92105

93106
let error: undefined | { value: unknown }
94107

@@ -115,7 +128,7 @@ export function replicateAsyncIterator<T, TReturn, TNext>(
115128

116129
for (let id = 0; id < count; id++) {
117130
queue.open(id)
118-
replicated.push(createAsyncIteratorObject(
131+
replicated.push(new AsyncIteratorClass(
119132
() => {
120133
start()
121134

packages/standard-server-fetch/src/event-iterator.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { createAsyncIteratorObject, isTypescriptObject, parseEmptyableJSON, stringifyJSON } from '@orpc/shared'
1+
import { AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, stringifyJSON } from '@orpc/shared'
22
import {
33
encodeEventMessage,
44
ErrorEvent,
@@ -9,14 +9,14 @@ import {
99

1010
export function toEventIterator(
1111
stream: ReadableStream<Uint8Array> | null,
12-
): AsyncIteratorObject<unknown | void, unknown | void, void> & AsyncGenerator<unknown | void, unknown | void, void> {
12+
): AsyncIteratorClass<unknown> {
1313
const eventStream = stream
1414
?.pipeThrough(new TextDecoderStream())
1515
.pipeThrough(new EventDecoderStream())
1616

1717
const reader = eventStream?.getReader()
1818

19-
return createAsyncIteratorObject(async () => {
19+
return new AsyncIteratorClass(async () => {
2020
while (true) {
2121
if (reader === undefined) {
2222
return { done: true, value: undefined }

packages/standard-server-peer/src/event-iterator.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import type { CreateAsyncIteratorObjectCleanupFn } from '@orpc/shared'
1+
import type { AsyncIteratorClassCleanupFn } from '@orpc/shared'
22
import type { AsyncIdQueue } from '../../shared/src/queue'
33
import type { EventIteratorPayload } from './codec'
4-
import { createAsyncIteratorObject, isTypescriptObject } from '@orpc/shared'
4+
import { AsyncIteratorClass, isTypescriptObject } from '@orpc/shared'
55
import { ErrorEvent, getEventMeta, withEventMeta } from '@orpc/standard-server'
66

77
export function toEventIterator(
88
queue: AsyncIdQueue<EventIteratorPayload>,
99
id: number,
10-
cleanup: CreateAsyncIteratorObjectCleanupFn,
11-
): AsyncGenerator {
12-
return createAsyncIteratorObject(async () => {
10+
cleanup: AsyncIteratorClassCleanupFn,
11+
): AsyncIteratorClass<unknown> {
12+
return new AsyncIteratorClass(async () => {
1313
const item = await queue.pull(id)
1414

1515
switch (item.event) {

0 commit comments

Comments
 (0)