Streaming Queries
Cursor-based streaming via pg-cursor and Effect Stream
streamcursorpg-cursorchunked
Streaming Queries
Overview
@effect/sql-pg supports streaming query results using pg-cursor, exposing them as Effect.Stream. This avoids loading entire result sets into memory for large queries.
Key Concepts
How It Works
The .stream property on any SQL statement returns a Stream.Stream<Row, SqlError>:
const sql = yield* SqlClient.SqlClient
const rows = yield* sql`SELECT generate_series(1, 3)`.stream.pipe(
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
// [{ generate_series: 1 }, { generate_series: 2 }, { generate_series: 3 }]
Internal Implementation
Streaming is implemented in ConnectionImpl.executeStream:
- A
Scopeis acquired - A
Pg.PoolClientis reserved for the duration (or the pre-assigned client is used for transactions) - A
pg-cursor(Cursor) is created with the query - A scope finalizer closes the cursor
- Results are pulled in chunks of 128 rows via
cursor.read(128, ...) Stream.repeatEffectChunkOptiondrives the pull loop until the cursor is exhausted
executeStream(sql, params, transformRows) {
return Effect.gen(function*() {
const cursor = client.query(new Cursor(sql, params))
yield* Scope.addFinalizer(scope, Effect.promise(() => cursor.close()))
const pull = Effect.async<Chunk.Chunk<any>, Option.Option<SqlError>>((resume) => {
cursor.read(128, (err, rows) => {
if (err) resume(Effect.fail(Option.some(new SqlError({...}))))
else if (Arr.isNonEmptyArray(rows)) resume(Effect.succeed(Chunk.unsafeFromArray(rows)))
else resume(Effect.fail(Option.none())) // stream complete
})
})
return Stream.repeatEffectChunkOption(pull)
}).pipe(Stream.unwrapScoped)
}
Chunk Size
The cursor reads 128 rows at a time. This is hardcoded and provides a good balance between memory usage and round-trip overhead.
Transform Support
If transformResultNames is configured, the transform is applied to each chunk of rows before they're emitted into the stream.
Code Examples
Streaming with processing:
const sql = yield* SqlClient.SqlClient
yield* sql`SELECT * FROM large_table`.stream.pipe(
Stream.tap((row) => processRow(row)),
Stream.runDrain
)
Collecting stream results:
const allRows = yield* sql`SELECT * FROM people`.stream.pipe(
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)
Related Files
src/PgClient.ts-executeStreammethod inConnectionImpltest/Client.test.ts- Stream test usinggenerate_series