From c949a35c4167ef9cd77c5aa4afa9bba4163b108c Mon Sep 17 00:00:00 2001 From: Denis Zubarev Date: Mon, 25 May 2026 17:43:10 +0300 Subject: [PATCH 1/5] add stream_set_blocking --- .../kphp-light/stdlib/file-functions.txt | 1 + .../stdlib/file/file-system-functions.h | 8 +++++ runtime-light/stdlib/file/resource.h | 6 ++++ runtime-light/streams/stream.h | 31 +++++++++++++++++++ 4 files changed, 46 insertions(+) diff --git a/builtin-functions/kphp-light/stdlib/file-functions.txt b/builtin-functions/kphp-light/stdlib/file-functions.txt index c71f7739fc..a04f361f1a 100644 --- a/builtin-functions/kphp-light/stdlib/file-functions.txt +++ b/builtin-functions/kphp-light/stdlib/file-functions.txt @@ -18,6 +18,7 @@ define('STREAM_CLIENT_CONNECT', 1); define('DEFAULT_SOCKET_TIMEOUT', 60); function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed; +function stream_set_blocking ($stream, $mode ::: bool) ::: bool; function fopen ($filename ::: string, $mode ::: string): mixed; /** @kphp-extern-func-info interruptible */ diff --git a/runtime-light/stdlib/file/file-system-functions.h b/runtime-light/stdlib/file/file-system-functions.h index 6fdef0c134..ce63616b9e 100644 --- a/runtime-light/stdlib/file/file-system-functions.h +++ b/runtime-light/stdlib/file/file-system-functions.h @@ -192,6 +192,14 @@ inline resource f$stream_socket_client(const string& address, std::optional(*std::move(expected)); } +inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept { + if (auto socket{from_mixed>(stream, {})}; !socket.is_null()) { + socket.get()->set_blocking(mode); + return true; + } + return false; +} + inline Optional f$file_get_contents(const string& stream) noexcept { if (auto sync_resource{from_mixed>(f$fopen(stream, FileSystemImageState::get().READ_MODE), {})}; !sync_resource.is_null()) { diff --git a/runtime-light/stdlib/file/resource.h b/runtime-light/stdlib/file/resource.h index f3eb83e2ea..9acbc0e740 100644 --- a/runtime-light/stdlib/file/resource.h +++ b/runtime-light/stdlib/file/resource.h @@ -658,6 +658,8 @@ class socket : public async_resource { static auto open(std::string_view scheme) noexcept -> std::expected; + auto set_blocking(bool blocking) noexcept -> void; + auto write(std::span buf) noexcept -> kphp::coro::task> override; auto read(std::span buf) noexcept -> kphp::coro::task> override; auto get_contents() noexcept -> kphp::coro::task> override; @@ -689,6 +691,10 @@ inline auto socket::open(std::string_view scheme) noexcept -> std::expected void { + m_stream.set_blocking(blocking); +} + inline auto socket::write(std::span buf) noexcept -> kphp::coro::task> { if (!m_open) [[unlikely]] { co_return std::unexpected{k2::errno_enodev}; diff --git a/runtime-light/streams/stream.h b/runtime-light/streams/stream.h index 43b81b9e41..2582d083d5 100644 --- a/runtime-light/streams/stream.h +++ b/runtime-light/streams/stream.h @@ -27,12 +27,16 @@ namespace kphp::component { class stream { + bool m_non_blocking{false}; k2::descriptor m_descriptor{k2::INVALID_PLATFORM_DESCRIPTOR}; kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; explicit stream(k2::descriptor descriptor) noexcept : m_descriptor(descriptor) {} + auto read_non_blocking(std::span buf) const noexcept -> kphp::coro::task>; + auto write_non_blocking(std::span buf) const noexcept -> kphp::coro::task>; + public: stream(stream&& other) noexcept : m_descriptor(std::exchange(other.m_descriptor, k2::INVALID_PLATFORM_DESCRIPTOR)) {} @@ -60,6 +64,7 @@ class stream { auto descriptor() const noexcept -> k2::descriptor; auto reset(k2::descriptor descriptor) noexcept -> void; auto status() const noexcept -> k2::StreamStatus; + auto set_blocking(bool blocking) noexcept -> void; auto read(std::span buf) const noexcept -> kphp::coro::task>; template> F> @@ -72,6 +77,20 @@ class stream { // ================================================================================================ +inline auto stream::read_non_blocking(std::span buf) const noexcept -> kphp::coro::task> { + if (status().read_status != k2::IOStatus::IOAvailable) { + co_return std::expected{0}; + } + co_return std::expected{k2::read(m_descriptor, buf)}; +} + +inline auto stream::write_non_blocking(std::span buf) const noexcept -> kphp::coro::task> { + if (status().write_status != k2::IOStatus::IOAvailable) { + co_return std::expected{0}; + } + co_return std::expected{k2::write(m_descriptor, buf)}; +} + inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected { int32_t errc{}; k2::descriptor descriptor{k2::INVALID_PLATFORM_DESCRIPTOR}; @@ -125,7 +144,15 @@ inline auto stream::status() const noexcept -> k2::StreamStatus { return stream_status; } +inline auto stream::set_blocking(bool blocking) noexcept -> void { + m_non_blocking = !blocking; +} + inline auto stream::read(std::span buf) const noexcept -> kphp::coro::task> { + if (m_non_blocking) { + co_return co_await read_non_blocking(buf); + } + for (size_t read{}; read < buf.size();) { switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::read)) { case kphp::coro::poll_status::event: @@ -158,6 +185,10 @@ auto stream::read_all(F f) const noexcept -> kphp::coro::task buf) const noexcept -> kphp::coro::task> { + if (m_non_blocking) { + co_return co_await write_non_blocking(buf); + } + for (size_t written{}; written < buf.size();) { switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::write)) { case kphp::coro::poll_status::event: From b432987625feeffc3f260a64aae9bfa5b23ec912 Mon Sep 17 00:00:00 2001 From: Denis Zubarev Date: Wed, 27 May 2026 16:58:33 +0300 Subject: [PATCH 2/5] minor fix --- runtime-light/streams/stream.h | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/runtime-light/streams/stream.h b/runtime-light/streams/stream.h index 2582d083d5..d3576e9b0a 100644 --- a/runtime-light/streams/stream.h +++ b/runtime-light/streams/stream.h @@ -34,8 +34,8 @@ class stream { explicit stream(k2::descriptor descriptor) noexcept : m_descriptor(descriptor) {} - auto read_non_blocking(std::span buf) const noexcept -> kphp::coro::task>; - auto write_non_blocking(std::span buf) const noexcept -> kphp::coro::task>; + auto read_non_blocking(std::span buf) const noexcept -> std::expected; + auto write_non_blocking(std::span buf) const noexcept -> std::expected; public: stream(stream&& other) noexcept @@ -77,18 +77,18 @@ class stream { // ================================================================================================ -inline auto stream::read_non_blocking(std::span buf) const noexcept -> kphp::coro::task> { +inline auto stream::read_non_blocking(std::span buf) const noexcept -> std::expected { if (status().read_status != k2::IOStatus::IOAvailable) { - co_return std::expected{0}; + return std::expected{0}; } - co_return std::expected{k2::read(m_descriptor, buf)}; + return k2::read(m_descriptor, buf); } -inline auto stream::write_non_blocking(std::span buf) const noexcept -> kphp::coro::task> { +inline auto stream::write_non_blocking(std::span buf) const noexcept -> std::expected { if (status().write_status != k2::IOStatus::IOAvailable) { - co_return std::expected{0}; + return std::expected{0}; } - co_return std::expected{k2::write(m_descriptor, buf)}; + return k2::write(m_descriptor, buf); } inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected { @@ -150,7 +150,7 @@ inline auto stream::set_blocking(bool blocking) noexcept -> void { inline auto stream::read(std::span buf) const noexcept -> kphp::coro::task> { if (m_non_blocking) { - co_return co_await read_non_blocking(buf); + co_return read_non_blocking(buf); } for (size_t read{}; read < buf.size();) { @@ -186,7 +186,7 @@ auto stream::read_all(F f) const noexcept -> kphp::coro::task buf) const noexcept -> kphp::coro::task> { if (m_non_blocking) { - co_return co_await write_non_blocking(buf); + co_return write_non_blocking(buf); } for (size_t written{}; written < buf.size();) { From e8f8def67f58d9d0dc0d1b0a008af81700ac0d2c Mon Sep 17 00:00:00 2001 From: Denis Zubarev Date: Thu, 28 May 2026 12:54:18 +0300 Subject: [PATCH 3/5] minor fix --- runtime-light/k2-platform/k2-api.h | 1 + runtime-light/streams/stream.h | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/runtime-light/k2-platform/k2-api.h b/runtime-light/k2-platform/k2-api.h index 9eb8f2b445..1b439b7bd9 100644 --- a/runtime-light/k2-platform/k2-api.h +++ b/runtime-light/k2-platform/k2-api.h @@ -37,6 +37,7 @@ inline constexpr size_t DEFAULT_MEMORY_ALIGN = 16; inline constexpr int32_t errno_ok = 0; inline constexpr int32_t errno_e2big = E2BIG; +inline constexpr int32_t errno_ebadfd = EBADF; inline constexpr int32_t errno_ebusy = EBUSY; inline constexpr int32_t errno_enodev = ENODEV; inline constexpr int32_t errno_einval = EINVAL; diff --git a/runtime-light/streams/stream.h b/runtime-light/streams/stream.h index d3576e9b0a..14c4457d12 100644 --- a/runtime-light/streams/stream.h +++ b/runtime-light/streams/stream.h @@ -78,17 +78,25 @@ class stream { // ================================================================================================ inline auto stream::read_non_blocking(std::span buf) const noexcept -> std::expected { - if (status().read_status != k2::IOStatus::IOAvailable) { + switch (status().read_status) { + case k2::IOStatus::IOBlocked: return std::expected{0}; + case k2::IOStatus::IOClosed: + return std::unexpected{k2::errno_ebadfd}; + case k2::IOStatus::IOAvailable: + return k2::read(m_descriptor, buf); } - return k2::read(m_descriptor, buf); } inline auto stream::write_non_blocking(std::span buf) const noexcept -> std::expected { - if (status().write_status != k2::IOStatus::IOAvailable) { + switch (status().write_status) { + case k2::IOStatus::IOBlocked: return std::expected{0}; + case k2::IOStatus::IOClosed: + return std::unexpected{k2::errno_ebadfd}; + case k2::IOStatus::IOAvailable: + return k2::write(m_descriptor, buf); } - return k2::write(m_descriptor, buf); } inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected { From 54ae50353434a110b5981ede9193b698a76a1d32 Mon Sep 17 00:00:00 2001 From: Denis Zubarev Date: Thu, 28 May 2026 13:01:22 +0300 Subject: [PATCH 4/5] add warning --- runtime-light/stdlib/file/file-system-functions.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime-light/stdlib/file/file-system-functions.h b/runtime-light/stdlib/file/file-system-functions.h index ce63616b9e..ecd4d291f5 100644 --- a/runtime-light/stdlib/file/file-system-functions.h +++ b/runtime-light/stdlib/file/file-system-functions.h @@ -197,6 +197,8 @@ inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept { socket.get()->set_blocking(mode); return true; } + + kphp::log::warning("unexpected resource in stream_set_blocking -> {}", stream.to_string().c_str()); return false; } From 7b235a3fd25980e5b3b2966c7f91a23b0798602c Mon Sep 17 00:00:00 2001 From: Denis Zubarev Date: Thu, 28 May 2026 13:01:39 +0300 Subject: [PATCH 5/5] fmt --- runtime-light/stdlib/file/file-system-functions.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime-light/stdlib/file/file-system-functions.h b/runtime-light/stdlib/file/file-system-functions.h index ecd4d291f5..ea10a4fa63 100644 --- a/runtime-light/stdlib/file/file-system-functions.h +++ b/runtime-light/stdlib/file/file-system-functions.h @@ -197,7 +197,7 @@ inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept { socket.get()->set_blocking(mode); return true; } - + kphp::log::warning("unexpected resource in stream_set_blocking -> {}", stream.to_string().c_str()); return false; }