From 82cad90cc61eadc3d7dd5120616165309961acd2 Mon Sep 17 00:00:00 2001 From: Alexander Karashchuk Date: Sat, 20 Apr 2024 21:13:38 +0300 Subject: [PATCH 1/3] Removed deprecated size and change connection security protocol --- src/Statement/CreateConnection.php | 11 +++++++---- src/Statement/CreateSink.php | 4 ++-- src/Statement/CreateSource.php | 21 ++------------------- 3 files changed, 11 insertions(+), 25 deletions(-) diff --git a/src/Statement/CreateConnection.php b/src/Statement/CreateConnection.php index 5f0bedb..3b64d4d 100644 --- a/src/Statement/CreateConnection.php +++ b/src/Statement/CreateConnection.php @@ -42,11 +42,14 @@ private function __construct(string $name, string $definition) public static function kafka(string $name, string ...$brokers): self { if (count($brokers) > 1) { - $definition = sprintf('KAFKA (BROKERS (%s))', implode(', ', array_map(static function (string $dsn) { - return sprintf('\'%s\'', $dsn); - }, $brokers))); + $definition = sprintf( + "KAFKA (BROKERS (%s), SECURITY PROTOCOL = 'PLAINTEXT')", + implode(', ', array_map(static function (string $dsn) { + return sprintf('\'%s\'', $dsn); + }, $brokers)) + ); } else { - $definition = sprintf('KAFKA (BROKER \'%s\')', current($brokers)); + $definition = sprintf("KAFKA (BROKER '%s', SECURITY PROTOCOL = 'PLAINTEXT')", current($brokers)); } return new self($name, $definition); diff --git a/src/Statement/CreateSink.php b/src/Statement/CreateSink.php index b069e46..ccc55a4 100644 --- a/src/Statement/CreateSink.php +++ b/src/Statement/CreateSink.php @@ -109,8 +109,8 @@ public function __toString(): string $query .= ' IF NOT EXISTS'; } - $query .= ' %s FROM %s INTO %s WITH (SIZE = \'%s\')'; + $query .= ' %s FROM %s INTO %s'; - return sprintf($query, $this->name, $this->from, $this->definition, $this->size); + return sprintf($query, $this->name, $this->from, $this->definition); } } diff --git a/src/Statement/CreateSource.php b/src/Statement/CreateSource.php index e2bae28..9b14656 100644 --- a/src/Statement/CreateSource.php +++ b/src/Statement/CreateSource.php @@ -19,11 +19,6 @@ class CreateSource implements Command */ private $definition; - /** - * @var string - */ - private $size = '1'; - /** * @var bool */ @@ -71,18 +66,6 @@ public static function postgres(string $name, string $connection, string $public return new self($name, $definition); } - /** - * @param string $size - * - * @return self - */ - public function size(string $size): self - { - $this->size = $size; - - return $this; - } - /** * @return self */ @@ -106,8 +89,8 @@ public function __toString(): string $query .= ' IF NOT EXISTS'; } - $query .= ' %s FROM %s WITH (SIZE = \'%s\')'; + $query .= ' %s FROM %s'; - return sprintf($query, $this->name, $this->definition, $this->size); + return sprintf($query, $this->name, $this->definition); } } From 3067c78cd4b524daa00ed2460f588a1068351e99 Mon Sep 17 00:00:00 2001 From: Alexander Karashchuk Date: Wed, 1 Apr 2026 10:54:05 +0300 Subject: [PATCH 2/3] Bump psr/log (#4) --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 229a704..0710576 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,7 @@ "require": { "php": "^7.2 || ^8.0", "ext-pdo": "*", - "psr/log": "^1.1 || ^2.0" + "psr/log": "^1.1 || ^2.0 || ^3.0" }, "require-dev": { "phpunit/phpunit": "^8.5 || ^9.0", From 5d6401693b0d228b8d18b21b78384f805a30decd Mon Sep 17 00:00:00 2001 From: Alexander Karashchuk Date: Wed, 8 Apr 2026 10:57:09 +0300 Subject: [PATCH 3/3] Fixed nullable (#5) --- src/Connection.php | 2 +- src/Statement/CreateSink.php | 2 +- src/Statement/CreateSource.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index 2e82f4a..e4c5475 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -47,7 +47,7 @@ private function __construct(PDO $connection, LoggerInterface $logger) * * @return self */ - public static function open(string $url, LoggerInterface $logger = null): self + public static function open(string $url, ?LoggerInterface $logger = null): self { $parts = array_replace(self::DEFAULTS, array_filter(parse_url($url) ?: [])); $dsn = sprintf(self::DSN_TEMPLATE, $parts['host'], $parts['port'], trim((string) $parts['path'], '/')); diff --git a/src/Statement/CreateSink.php b/src/Statement/CreateSink.php index ccc55a4..563588c 100644 --- a/src/Statement/CreateSink.php +++ b/src/Statement/CreateSink.php @@ -62,7 +62,7 @@ public static function kafka( string $connection, string $topic, array $keys = [], - Envelope $envelope = null + ?Envelope $envelope = null ): self { $envelope = $envelope ?? Envelope::default(); $definition = sprintf('KAFKA CONNECTION %s (TOPIC \'%s\') ', $connection, $topic); diff --git a/src/Statement/CreateSource.php b/src/Statement/CreateSource.php index 9b14656..1de49d9 100644 --- a/src/Statement/CreateSource.php +++ b/src/Statement/CreateSource.php @@ -42,7 +42,7 @@ private function __construct(string $name, string $definition) * * @return self */ - public static function kafka(string $name, string $connection, string $topic, Envelope $envelope = null): self + public static function kafka(string $name, string $connection, string $topic, ?Envelope $envelope = null): self { $envelope = $envelope ?? Envelope::default(); $definition = sprintf('KAFKA CONNECTION %s (TOPIC \'%s\') ', $connection, $topic);