Skip to content
1 change: 1 addition & 0 deletions changes/3547.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Moved concurrency limits to a global per-event loop setting instead of per-array call.
25 changes: 8 additions & 17 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
Expand All @@ -8,8 +9,7 @@

from zarr.abc.metadata import Metadata
from zarr.core.buffer import Buffer, NDBuffer
from zarr.core.common import NamedConfig, concurrent_map
from zarr.core.config import config
from zarr.core.common import NamedConfig

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
Expand Down Expand Up @@ -225,11 +225,8 @@ async def decode_partial(
-------
Iterable[NDBuffer | None]
"""
return await concurrent_map(
list(batch_info),
self._decode_partial_single,
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
return await asyncio.gather(*[self._decode_partial_single(*info) for info in batch_info])


class ArrayBytesCodecPartialEncodeMixin:
Expand Down Expand Up @@ -262,11 +259,8 @@ async def encode_partial(
The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data.
The chunk spec contains information about the chunk.
"""
await concurrent_map(
list(batch_info),
self._encode_partial_single,
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
await asyncio.gather(*[self._encode_partial_single(*info) for info in batch_info])


class CodecPipeline:
Expand Down Expand Up @@ -464,11 +458,8 @@ async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
) -> list[CodecOutput | None]:
return await concurrent_map(
list(batch_info),
_noop_for_none(func),
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
return await asyncio.gather(*[_noop_for_none(func)(chunk, spec) for chunk, spec in batch_info])


def _noop_for_none(
Expand Down
10 changes: 3 additions & 7 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from asyncio import gather
from dataclasses import dataclass
Expand Down Expand Up @@ -462,13 +463,8 @@ async def getsize_prefix(self, prefix: str) -> int:
# improve tail latency and might reduce memory pressure (since not all keys
# would be in memory at once).

# avoid circular import
from zarr.core.common import concurrent_map
from zarr.core.config import config

keys = [(x,) async for x in self.list_prefix(prefix)]
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
keys = [x async for x in self.list_prefix(prefix)]
sizes = await asyncio.gather(*[self.getsize(key) for key in keys])
return sum(sizes)


Expand Down
28 changes: 12 additions & 16 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
import warnings
from asyncio import gather
Expand All @@ -22,7 +23,6 @@
import numpy as np
from typing_extensions import deprecated

import zarr
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec
from zarr.abc.numcodec import Numcodec, _is_numcodec
from zarr.codecs._v2 import V2Codec
Expand Down Expand Up @@ -60,7 +60,6 @@
_default_zarr_format,
_warn_order_kwarg,
ceildiv,
concurrent_map,
parse_shapelike,
product,
)
Expand Down Expand Up @@ -1848,13 +1847,12 @@ async def resize(self, new_shape: ShapeLike, delete_outside_chunks: bool = True)
async def _delete_key(key: str) -> None:
await (self.store_path / key).delete()

await concurrent_map(
[
(self.metadata.encode_chunk_key(chunk_coords),)
# Store handles concurrency limiting internally
await asyncio.gather(
*[
_delete_key(self.metadata.encode_chunk_key(chunk_coords))
for chunk_coords in old_chunk_coords.difference(new_chunk_coords)
],
_delete_key,
zarr_config.get("async.concurrency"),
]
)

# Write new metadata
Expand Down Expand Up @@ -4535,21 +4533,19 @@ async def _copy_array_region(
await result.setitem(chunk_coords, arr)

# Stream data from the source array to the new array
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_array_region,
zarr.core.config.config.get("async.concurrency"),
# Store handles concurrency limiting internally
await asyncio.gather(
*[_copy_array_region(region, data) for region in result._iter_shard_regions()]
)
else:

async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> None:
await result.setitem(chunk_coords, _data[chunk_coords])

# Stream data from the source array to the new array
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_arraylike_region,
zarr.core.config.config.get("async.concurrency"),
# Store handles concurrency limiting internally
await asyncio.gather(
*[_copy_arraylike_region(region, data) for region in result._iter_shard_regions()]
)
return result

Expand Down
56 changes: 27 additions & 29 deletions src/zarr/core/codec_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from dataclasses import dataclass
from itertools import islice, pairwise
from typing import TYPE_CHECKING, Any, TypeVar
Expand All @@ -14,7 +15,6 @@
Codec,
CodecPipeline,
)
from zarr.core.common import concurrent_map
from zarr.core.config import config
from zarr.core.indexing import SelectorTuple, is_scalar
from zarr.errors import ZarrUserWarning
Expand Down Expand Up @@ -267,10 +267,12 @@ async def read_batch(
else:
out[out_selection] = fill_value_or_default(chunk_spec)
else:
chunk_bytes_batch = await concurrent_map(
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
lambda byte_getter, prototype: byte_getter.get(prototype),
config.get("async.concurrency"),
# Store handles concurrency limiting internally
chunk_bytes_batch = await asyncio.gather(
*[
byte_getter.get(array_spec.prototype)
for byte_getter, array_spec, *_ in batch_info
]
)
chunk_array_batch = await self.decode_batch(
[
Expand Down Expand Up @@ -368,16 +370,15 @@ async def _read_key(
return await byte_setter.get(prototype=prototype)

chunk_bytes_batch: Iterable[Buffer | None]
chunk_bytes_batch = await concurrent_map(
[
(
# Store handles concurrency limiting internally
chunk_bytes_batch = await asyncio.gather(
*[
_read_key(
None if is_complete_chunk else byte_setter,
chunk_spec.prototype,
)
for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info
],
_read_key,
config.get("async.concurrency"),
]
)
chunk_array_decoded = await self.decode_batch(
[
Expand Down Expand Up @@ -435,15 +436,14 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non
else:
await byte_setter.set(chunk_bytes)

await concurrent_map(
[
(byte_setter, chunk_bytes)
# Store handles concurrency limiting internally
await asyncio.gather(
*[
_write_key(byte_setter, chunk_bytes)
for chunk_bytes, (byte_setter, *_) in zip(
chunk_bytes_batch, batch_info, strict=False
)
],
_write_key,
config.get("async.concurrency"),
]
)

async def decode(
Expand All @@ -470,13 +470,12 @@ async def read(
out: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
await concurrent_map(
[
(single_batch_info, out, drop_axes)
# Process mini-batches concurrently - stores handle I/O concurrency internally
await asyncio.gather(
*[
self.read_batch(single_batch_info, out, drop_axes)
for single_batch_info in batched(batch_info, self.batch_size)
],
self.read_batch,
config.get("async.concurrency"),
]
)

async def write(
Expand All @@ -485,13 +484,12 @@ async def write(
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
await concurrent_map(
[
(single_batch_info, value, drop_axes)
# Process mini-batches concurrently - stores handle I/O concurrency internally
await asyncio.gather(
*[
self.write_batch(single_batch_info, value, drop_axes)
for single_batch_info in batched(batch_info, self.batch_size)
],
self.write_batch,
config.get("async.concurrency"),
]
)


Expand Down
Loading
Loading