Skip to content
Merged
80 changes: 80 additions & 0 deletions foreign/node/src/client/client.connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,33 @@ import { serializeCommand } from './client.utils.js';
import { debug } from './client.debug.js';


/**
* Creates a TCP socket connection.
*
* @param options - TCP connection options
* @returns TCP socket
*/
const createTcpSocket = (options: TcpOption): Socket => {
return createConnection(options);
};

/**
* Creates a TLS socket connection.
*
* @param options - TLS connection options including port
* @returns TLS socket
*/
const createTlsSocket = ({ port, ...options }: TlsOption): Socket => {
const socket = TLSConnect(port, options);
return socket;
};

/**
* Creates a socket based on the transport type in the configuration.
*
* @param config - Client configuration with transport type
* @returns Socket for the specified transport
*/
const getTransport = (config: ClientConfig): Socket => {
const { transport, options } = config;
switch (transport) {
Expand All @@ -46,12 +64,24 @@ const getTransport = (config: ClientConfig): Socket => {
}
};

/**
* Default reconnection settings.
* Attempts reconnection every 5 seconds, up to 12 times.
*/
const DefaultReconnectOption: ReconnectOption = {
enabled: true,
interval: 5 * 1000,
maxRetries: 12
}

/**
* Recreates a socket after a delay.
* Used for reconnection attempts.
*
* @param option - Client configuration
* @param timer - Delay in milliseconds before recreating
* @returns Promise resolving to a new socket
*/
function recreate(option: ClientConfig, timer = 1000): Promise<Socket> {
return new Promise((resolve) => {
setTimeout(() => {
Expand All @@ -60,21 +90,40 @@ function recreate(option: ClientConfig, timer = 1000): Promise<Socket> {
});
}

/** Socket error with optional error code */
type SocketError = Error & { code?: string };

/**
* Manages the low-level TCP/TLS connection to the Iggy server.
* Handles connection lifecycle, reconnection, and data buffering.
*/
export class IggyConnection extends EventEmitter {
/** Client configuration */
public config: ClientConfig
/** Underlying socket connection */
public socket: Socket;

/** Whether the connection is established */
public connected: boolean;
/** Whether a connection attempt is in progress */
public connecting: boolean;
/** Whether the connection is being intentionally closed */
public ending: boolean;
/** Whether waiting for more data to complete a response */
private waitingResponseEnd: boolean;
/** Reconnection configuration */
private reconnectOption: ReconnectOption;
/** Number of reconnection attempts made */
private reconnectCount: number;

/** Buffer for incomplete response data */
private readBuffers: Buffer;

/**
* Creates a new IggyConnection.
*
* @param config - Client configuration
*/
constructor(config: ClientConfig) {
super();
this.config = config;
Expand All @@ -88,6 +137,12 @@ export class IggyConnection extends EventEmitter {
this.readBuffers = Buffer.allocUnsafe(0);
}

/**
* Establishes the connection to the server.
* Sets up event handlers for data, errors, and disconnection.
*
* @returns Promise that resolves when connected
*/
connect() {
this.connecting = true;

Expand Down Expand Up @@ -122,6 +177,12 @@ export class IggyConnection extends EventEmitter {
});
}

/**
* Attempts to reconnect to the server.
* Respects maxRetries limit and emits error when exceeded.
*
* @param err - Optional error that triggered the reconnection
*/
async reconnect(err?: Error) {
const { enabled, interval, maxRetries } = this.reconnectOption
debug(
Expand Down Expand Up @@ -149,16 +210,28 @@ export class IggyConnection extends EventEmitter {
this.connect();
}

/**
* Destroys the connection and marks it as ending.
*/
_destroy() {
this.ending = true;
this.socket.destroy();
}

/**
* Clears the response buffer and resets the waiting state.
*/
_endResponseWait() {
this.readBuffers = Buffer.allocUnsafe(0);
this.waitingResponseEnd = false;
}

/**
* Handles incoming data from the socket.
* Buffers incomplete responses and emits complete ones.
*
* @param data - Incoming data buffer
*/
_onData(data: Buffer) {
debug(
'ONDATA',
Expand Down Expand Up @@ -211,6 +284,13 @@ export class IggyConnection extends EventEmitter {
this._endResponseWait();
}

/**
* Writes a command to the socket.
*
* @param command - Command code
* @param payload - Command payload
* @returns True if the write was successful
*/
writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
return this.socket.write(cmd);
Expand Down
4 changes: 4 additions & 0 deletions foreign/node/src/client/client.debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@

import Debug from 'debug';

/**
* Debug logger for the Iggy client.
* Enable with DEBUG=iggy:client environment variable.
*/
export const debug = Debug('iggy:client');
101 changes: 101 additions & 0 deletions foreign/node/src/client/client.socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,55 @@ import { IggyConnection } from './client.connection.js';
import { LOGIN, LOGIN_WITH_TOKEN, PING } from '../wire/index.js';


/**
* Command codes that can be executed without authentication.
*/
const UNLOGGED_COMMAND_CODE = [
PING.code,
LOGIN.code,
LOGIN_WITH_TOKEN.code
];

/**
* Represents a queued command job waiting to be executed.
*/
type Job = {
/** Command code */
command: number,
/** Command payload */
payload: Buffer,
/** Promise resolve function */
resolve: (v: CommandResponse | PromiseLike<CommandResponse>) => void,
/** Promise reject function */
reject: (e: unknown) => void
};


/**
* Manages command execution and response handling for the Iggy server.
* Implements command queuing, authentication, and heartbeat functionality.
*/
export class CommandResponseStream extends EventEmitter {
/** Client configuration */
private options: ClientConfig;
/** Underlying connection to the server */
private connection: IggyConnection;
/** Queue of pending command jobs */
private _execQueue: Job[];
/** Whether the stream is currently processing a command */
public busy: boolean;
/** Whether the client has been authenticated */
isAuthenticated: boolean;
/** Authenticated user ID */
userId?: number;
/** Heartbeat interval timer handle */
heartbeatIntervalHandler?: NodeJS.Timeout;

/**
* Creates a new CommandResponseStream.
*
* @param options - Client configuration
*/
constructor(options: ClientConfig) {
super();
this.options = options;
Expand All @@ -64,13 +90,26 @@ export class CommandResponseStream extends EventEmitter {
this._init();
};

/**
* Initializes the stream by setting up heartbeat and connection event handlers.
*/
_init() {
this.heartbeat(this.options.heartbeatInterval);
this.connection.on('disconnected', async () => {
this.isAuthenticated = false;
});
}

/**
* Sends a command to the server.
* Automatically handles connection and authentication if needed.
*
* @param command - Command code to send
* @param payload - Command payload buffer
* @param handleResponse - Whether to parse the response (default: true)
* @param last - Whether to add to end of queue (default: true)
* @returns Promise resolving to the command response
*/
async sendCommand(
command: number,
payload: Buffer,
Expand All @@ -93,6 +132,12 @@ export class CommandResponseStream extends EventEmitter {
});
}

/**
* Processes queued commands sequentially.
* Emits 'finishQueue' when all commands are processed.
*
* @param handleResponse - Whether to parse responses
*/
async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
Expand All @@ -111,6 +156,14 @@ export class CommandResponseStream extends EventEmitter {
this.emit('finishQueue');
}

/**
* Processes a single command by writing it to the connection and waiting for response.
*
* @param command - Command code
* @param payload - Command payload
* @param handleResp - Whether to parse the response
* @returns Promise resolving to the command response
*/
_processNext(
command: number,
payload: Buffer,
Expand All @@ -135,11 +188,22 @@ export class CommandResponseStream extends EventEmitter {
});
}

/**
* Fails all queued commands with the given error.
*
* @param err - Error to reject all queued commands with
*/
_failQueue(err: Error) {
this._execQueue.forEach(({ reject }) => reject(err));
this._execQueue = [];
}

/**
* Authenticates the client with the server.
*
* @param creds - Authentication credentials (token or password)
* @returns True if authentication succeeded
*/
async authenticate(creds: ClientCredentials) {
const r = ('token' in creds) ?
await this._authWithToken(creds) :
Expand All @@ -149,24 +213,46 @@ export class CommandResponseStream extends EventEmitter {
return this.isAuthenticated;
}

/**
* Authenticates using username and password.
*
* @param creds - Password credentials
* @returns Login response with user ID
*/
async _authWithPassword(creds: PasswordCredentials) {
const pl = LOGIN.serialize(creds);
const logr = await this.sendCommand(LOGIN.code, pl, true, false);
return LOGIN.deserialize(logr);
}

/**
* Authenticates using a token.
*
* @param creds - Token credentials
* @returns Login response with user ID
*/
async _authWithToken(creds: TokenCredentials) {
const pl = LOGIN_WITH_TOKEN.serialize(creds);
const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl, true, false);
return LOGIN_WITH_TOKEN.deserialize(logr);
}

/**
* Sends a ping command to the server.
*
* @returns Ping response
*/
async ping() {
const pl = PING.serialize();
const pingR = await this.sendCommand(PING.code, pl, true);
return PING.deserialize(pingR);
}

/**
* Starts sending periodic heartbeat pings to keep the connection alive.
*
* @param interval - Heartbeat interval in milliseconds
*/
heartbeat(interval?: number) {
if (!interval)
return
Expand All @@ -179,10 +265,19 @@ export class CommandResponseStream extends EventEmitter {
}, interval);
}

/**
* Returns the underlying socket as a readable stream.
*
* @returns The connection socket
*/
getReadStream() {
return this.connection.socket;
}

/**
* Destroys the stream and cleans up resources.
* Stops heartbeat and destroys the connection.
*/
destroy() {
if (this.heartbeatIntervalHandler)
clearInterval(this.heartbeatIntervalHandler);
Expand All @@ -191,6 +286,12 @@ export class CommandResponseStream extends EventEmitter {
};


/**
* Creates a new RawClient instance.
*
* @param options - Client configuration
* @returns RawClient instance
*/
export function getRawClient(options: ClientConfig): RawClient {
return new CommandResponseStream(options);
}
Loading