diff --git a/builtin-functions/kphp-light/stdlib/file-functions.txt b/builtin-functions/kphp-light/stdlib/file-functions.txt index c71f7739fc..a04f361f1a 100644 --- a/builtin-functions/kphp-light/stdlib/file-functions.txt +++ b/builtin-functions/kphp-light/stdlib/file-functions.txt @@ -18,6 +18,7 @@ define('STREAM_CLIENT_CONNECT', 1); define('DEFAULT_SOCKET_TIMEOUT', 60); function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed; +function stream_set_blocking ($stream, $mode ::: bool) ::: bool; function fopen ($filename ::: string, $mode ::: string): mixed; /** @kphp-extern-func-info interruptible */ diff --git a/runtime-light/k2-platform/k2-api.h b/runtime-light/k2-platform/k2-api.h index 9eb8f2b445..1b439b7bd9 100644 --- a/runtime-light/k2-platform/k2-api.h +++ b/runtime-light/k2-platform/k2-api.h @@ -37,6 +37,7 @@ inline constexpr size_t DEFAULT_MEMORY_ALIGN = 16; inline constexpr int32_t errno_ok = 0; inline constexpr int32_t errno_e2big = E2BIG; +inline constexpr int32_t errno_ebadfd = EBADF; inline constexpr int32_t errno_ebusy = EBUSY; inline constexpr int32_t errno_enodev = ENODEV; inline constexpr int32_t errno_einval = EINVAL; diff --git a/runtime-light/stdlib/file/file-system-functions.h b/runtime-light/stdlib/file/file-system-functions.h index 6fdef0c134..ea10a4fa63 100644 --- a/runtime-light/stdlib/file/file-system-functions.h +++ b/runtime-light/stdlib/file/file-system-functions.h @@ -192,6 +192,16 @@ inline resource f$stream_socket_client(const string& address, std::optional(*std::move(expected)); } +inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept { + if (auto socket{from_mixed>(stream, {})}; !socket.is_null()) { + socket.get()->set_blocking(mode); + return true; + } + + kphp::log::warning("unexpected resource in stream_set_blocking -> {}", stream.to_string().c_str()); + return false; +} + inline Optional f$file_get_contents(const string& stream) noexcept { if (auto sync_resource{from_mixed>(f$fopen(stream, FileSystemImageState::get().READ_MODE), {})}; !sync_resource.is_null()) { diff --git a/runtime-light/stdlib/file/resource.h b/runtime-light/stdlib/file/resource.h index f3eb83e2ea..9acbc0e740 100644 --- a/runtime-light/stdlib/file/resource.h +++ b/runtime-light/stdlib/file/resource.h @@ -658,6 +658,8 @@ class socket : public async_resource { static auto open(std::string_view scheme) noexcept -> std::expected; + auto set_blocking(bool blocking) noexcept -> void; + auto write(std::span buf) noexcept -> kphp::coro::task> override; auto read(std::span buf) noexcept -> kphp::coro::task> override; auto get_contents() noexcept -> kphp::coro::task> override; @@ -689,6 +691,10 @@ inline auto socket::open(std::string_view scheme) noexcept -> std::expected void { + m_stream.set_blocking(blocking); +} + inline auto socket::write(std::span buf) noexcept -> kphp::coro::task> { if (!m_open) [[unlikely]] { co_return std::unexpected{k2::errno_enodev}; diff --git a/runtime-light/streams/stream.h b/runtime-light/streams/stream.h index 43b81b9e41..14c4457d12 100644 --- a/runtime-light/streams/stream.h +++ b/runtime-light/streams/stream.h @@ -27,12 +27,16 @@ namespace kphp::component { class stream { + bool m_non_blocking{false}; k2::descriptor m_descriptor{k2::INVALID_PLATFORM_DESCRIPTOR}; kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; explicit stream(k2::descriptor descriptor) noexcept : m_descriptor(descriptor) {} + auto read_non_blocking(std::span buf) const noexcept -> std::expected; + auto write_non_blocking(std::span buf) const noexcept -> std::expected; + public: stream(stream&& other) noexcept : m_descriptor(std::exchange(other.m_descriptor, k2::INVALID_PLATFORM_DESCRIPTOR)) {} @@ -60,6 +64,7 @@ class stream { auto descriptor() const noexcept -> k2::descriptor; auto reset(k2::descriptor descriptor) noexcept -> void; auto status() const noexcept -> k2::StreamStatus; + auto set_blocking(bool blocking) noexcept -> void; auto read(std::span buf) const noexcept -> kphp::coro::task>; template> F> @@ -72,6 +77,28 @@ class stream { // ================================================================================================ +inline auto stream::read_non_blocking(std::span buf) const noexcept -> std::expected { + switch (status().read_status) { + case k2::IOStatus::IOBlocked: + return std::expected{0}; + case k2::IOStatus::IOClosed: + return std::unexpected{k2::errno_ebadfd}; + case k2::IOStatus::IOAvailable: + return k2::read(m_descriptor, buf); + } +} + +inline auto stream::write_non_blocking(std::span buf) const noexcept -> std::expected { + switch (status().write_status) { + case k2::IOStatus::IOBlocked: + return std::expected{0}; + case k2::IOStatus::IOClosed: + return std::unexpected{k2::errno_ebadfd}; + case k2::IOStatus::IOAvailable: + return k2::write(m_descriptor, buf); + } +} + inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected { int32_t errc{}; k2::descriptor descriptor{k2::INVALID_PLATFORM_DESCRIPTOR}; @@ -125,7 +152,15 @@ inline auto stream::status() const noexcept -> k2::StreamStatus { return stream_status; } +inline auto stream::set_blocking(bool blocking) noexcept -> void { + m_non_blocking = !blocking; +} + inline auto stream::read(std::span buf) const noexcept -> kphp::coro::task> { + if (m_non_blocking) { + co_return read_non_blocking(buf); + } + for (size_t read{}; read < buf.size();) { switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::read)) { case kphp::coro::poll_status::event: @@ -158,6 +193,10 @@ auto stream::read_all(F f) const noexcept -> kphp::coro::task buf) const noexcept -> kphp::coro::task> { + if (m_non_blocking) { + co_return write_non_blocking(buf); + } + for (size_t written{}; written < buf.size();) { switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::write)) { case kphp::coro::poll_status::event: