Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/NetMQ.Tests/SocketOptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,65 @@ public void HelloMsgInproc()

Assert.Equal("H", msg);
}


[Fact]
public void DisconnectMsgInProc()
{
// Create a router
using var router = new RouterSocket();
router.Options.DisconnectMessage = new byte[] {(byte)'D'};

// bind router
router.Bind("inproc://inproc-hello-msg");

// create a dealer
using var dealer = new DealerSocket();
dealer.Options.HelloMessage = new byte[] {(byte)'H'};
dealer.Connect("inproc://inproc-hello-msg");

var msg = router.ReceiveMultipartMessage();

Assert.Equal("H", msg.Last.ConvertToString());

dealer.Close();

var routerMsg = router.ReceiveMultipartMessage();


Assert.Equal("D",
routerMsg.Last.ConvertToString());

}


[Fact]
public void DisconnectMsgTcp()
{
// Create a router
using var router = new RouterSocket();
router.Options.DisconnectMessage = new byte[] {(byte)'D'};

// bind router
int port = router.BindRandomPort("tcp://*");

// create a dealer
using var dealer = new DealerSocket();
dealer.Options.HelloMessage = new byte[] {(byte)'H'};
dealer.Connect($"tcp://localhost:{port}");

var msg = router.ReceiveMultipartMessage();

Assert.Equal("H", msg.Last.ConvertToString());

dealer.Close();

var routerMsg = router.ReceiveMultipartMessage();


Assert.Equal("D",
routerMsg.Last.ConvertToString());

}
}
}
36 changes: 35 additions & 1 deletion src/NetMQ/Core/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public Options()
HeartbeatTimeout = -1;
HelloMsg = null;
CanSendHelloMsg = false;
DisconnectMsg = null;
CanGenerateDisconnectMsg = false;
Correlate = false;
Relaxed = false;
}
Expand Down Expand Up @@ -321,12 +323,23 @@ public byte IdentitySize {
/// Hello msg to send to peer upon connecting
/// </summary>
public byte[]? HelloMsg { get; set; }



/// <summary>
/// Disconnect msg to send to peer upon disconnecting
/// </summary>
public byte[]? DisconnectMsg { get; set; }

/// <summary>
/// Indicate of socket can send an hello msg
/// </summary>
public bool CanSendHelloMsg { get; set; }

/// <summary>
/// Indicate of socket can generate a disconnect msg
/// </summary>
public bool CanGenerateDisconnectMsg { get; set; }

public bool Correlate { get; set; }
public bool Relaxed { get; set; }

Expand Down Expand Up @@ -528,6 +541,27 @@ public void SetSocketOption(ZmqSocketOption option, object? optionValue)
break;
}


case ZmqSocketOption.DisconnectMessage:
{
if (optionValue == null)
{
DisconnectMsg = null;
}
else if( CanGenerateDisconnectMsg )
{
var disconnectMsg = Get<byte[]>();
DisconnectMsg = new byte[disconnectMsg.Length];

Buffer.BlockCopy(disconnectMsg, 0, DisconnectMsg, 0, disconnectMsg.Length);
}
else
{
throw new InvalidException("Socket doesn't support disconnect message");
}
break;
}

case ZmqSocketOption.Relaxed:
{
Relaxed = Get<bool>();
Expand Down
1 change: 1 addition & 0 deletions src/NetMQ/Core/Patterns/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public Peer(Ctx parent, int threadId, int socketId)
m_nextPeerId = (uint) s_random.Next();
m_options.SocketType = ZmqSocketType.Peer;
m_options.CanSendHelloMsg = true;
m_options.CanGenerateDisconnectMsg = true;
m_fairQueueing = new FairQueueing();
m_prefetchedMsg = new Msg();
m_prefetchedMsg.InitEmpty();
Expand Down
1 change: 1 addition & 0 deletions src/NetMQ/Core/Patterns/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public Router(Ctx parent, int threadId, int socketId)
m_nextPeerId = s_random.Next();
m_options.SocketType = ZmqSocketType.Router;
m_options.CanSendHelloMsg = true;
m_options.CanGenerateDisconnectMsg = true;
m_fairQueueing = new FairQueueing();
m_prefetchedId = new Msg();
m_prefetchedId.InitEmpty();
Expand Down
1 change: 1 addition & 0 deletions src/NetMQ/Core/Patterns/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public Server(Ctx parent, int threadId, int socketId)
m_nextRoutingId = (uint) s_random.Next();
m_options.SocketType = ZmqSocketType.Server;
m_options.CanSendHelloMsg = true;
m_options.CanGenerateDisconnectMsg = true;
m_fairQueueing = new FairQueueing();
m_outpipes = new Dictionary<uint, Outpipe>();
}
Expand Down
26 changes: 26 additions & 0 deletions src/NetMQ/Core/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ private enum State
/// </summary>
private readonly ZObject m_parent;

/// <summary>
/// Disconnect msg
/// </summary>
private Msg m_disconnectMsg;

/// <summary>
/// Create a new Pipe object with the given parent, and inbound and outbound YPipes.
/// </summary>
Expand All @@ -167,6 +172,7 @@ private Pipe(
m_sink = null;
m_state = State.Active;
m_delay = true;
m_disconnectMsg.InitEmpty();
}

/// <summary>
Expand Down Expand Up @@ -229,6 +235,15 @@ public void SetEventSink(IPipeEvents sink)
m_sink = sink;
}

public void SetDisconnectMsg(byte[] disconnectMsg)
{
var msg = new Msg();

msg.InitPool(disconnectMsg.Length);
msg.Put(disconnectMsg, 0, disconnectMsg.Length);
m_disconnectMsg = msg;
}

/// <summary>
/// Get or set the byte-array that comprises the identity of this Pipe.
/// </summary>
Expand Down Expand Up @@ -690,5 +705,16 @@ public override string ToString()
{
return base.ToString() + "[" + m_parent + "]";
}

public void SendDisconnectMessage()
{
if (m_disconnectMsg.Size > 0)
{
Rollback();
m_outboundPipe?.Write(ref m_disconnectMsg, false);
Flush();
m_disconnectMsg.InitEmpty();
}
}
}
}
14 changes: 13 additions & 1 deletion src/NetMQ/Core/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ protected override void ProcessAttach(IEngine engine)
Debug.Assert(m_pipe == null);
m_pipe = pipes[0];

if (m_options.CanGenerateDisconnectMsg && m_options.DisconnectMsg is not null)
{
pipes[1].SetDisconnectMsg(m_options.DisconnectMsg);
}

// Ask socket to plug into the remote end of the pipe.
SendBind(m_socket, pipes[1]);
}
Expand All @@ -412,14 +417,21 @@ protected override void ProcessAttach(IEngine engine)
/// <summary>
/// Flush out any leftover messages and call Detached.
/// </summary>
public void Detach()
public void Detach(bool handshaked)
{
// Engine is dead. Let's forget about it.
m_engine = null;

// Remove any half-done messages from the pipes.
CleanPipes();

// Only send disconnect message if socket was accepted and handshake was completed
if (m_pipe is not null && m_pipe.Active && handshaked && m_options.CanGenerateDisconnectMsg && m_options.DisconnectMsg?.Length > 0)
{
m_pipe.SetDisconnectMsg(m_options.DisconnectMsg);
m_pipe.SendDisconnectMessage();
}

// Send the event to the derived class.
Detached();

Expand Down
Loading
Loading