Reputation: 181
I try to write and read events on an eventstore cluster using a .Net client. Therefore I've set up 3 local docker containers to test the write and read to the cluster. I've already set up the cluster and it says it's alive (1 Master and 2 Slaves).
Unfortunately when I try to write or read from the .Net client on the eventstore the connection opens and closes itself all the time and no calls get to the store. How can I connect to a cluster to write and read events?
If I use a eventstore without a cluster the methods works correctly and I can create and consume events. With the cluster I only receive the following log messages:
Start connecting to Eventstore
Start reading events:
[04,06:49:19.771,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[04,06:49:19.771,INFO] Discovering attempt 1/10 successful: best candidate is [172.16.1.103:1112, n/a].
[10,06:49:20.396,INFO] ClientAPI TcpConnection closed [06:49:20.396: N172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}]:
[10,06:49:20.396,INFO] Received bytes: 0, Sent bytes: 0
[10,06:49:20.396,INFO] Send calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Receive calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Close reason: [Success] Connection establishment timeout.
[10,06:49:20.397,DEBUG] TcpPackageConnection: connection [172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}] was closed cleanly.
[10,06:49:22.564,INFO] Discovering attempt 1/10 failed: no candidate found.
[10,06:49:23.068,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[10,06:49:23.068,INFO] Discovering attempt 2/10 successful: best candidate is [172.16.1.103:1112, n/a].
If I write an event to the clustered eventstore by using the WebUI it will accept it and the event can then be found in the WebUI of the eventstore.
What follows is my setup in case you need the details:
The eventstore cluster is set up as described in the eventstore docs.
The cluster is set up to not use dns by setting discover-via-dns=false
. Here is my .yaml
file for the docker setup:
version: '3'
services:
eventstore1:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.101
EVENTSTORE_EXT_IP: 172.16.1.101
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.102:2113,172.16.1.103:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
ports:
- 1111:1111
- 1112:1112
- 2113:2113
- 2114:2114
networks:
app_net:
ipv4_address: 172.16.1.101
eventstore2:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.102
EVENTSTORE_EXT_IP: 172.16.1.102
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.103:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:4114/"
ports:
- 3111:1111
- 3112:1112
- 4113:2113
- 4114:2114
networks:
app_net:
ipv4_address: 172.16.1.102
eventstore3:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.103
EVENTSTORE_EXT_IP: 172.16.1.103
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.102:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
ports:
- 5111:1111
- 5112:1112
- 6113:2113
- 6114:2114
networks:
app_net:
ipv4_address: 172.16.1.103
networks:
app_net:
external: true
To test the connection and if it is possible to write and read on the eventstore cluster I created the following small application:
Main Program:
class Program
{
const string STREAM = "MyTestStream";
private static ConnectionFactory _connectionFactory = new ConnectionFactory();
static void Main(string[] args)
{
var useCluster = true;
Console.WriteLine("Start connecting to Eventstore");
var writeEvents = new WriteEvents(_connectionFactory);
writeEvents.Write(useCluster, STREAM).Wait();
Console.WriteLine("Start reading events:");
var readEvents = new ReadEvents(_connectionFactory);
readEvents.Read(useCluster, STREAM).Wait();
Console.WriteLine("Check if events are written and press key to exit");
Console.ReadKey();
}
}
ConnectionFactory:
public class ConnectionFactory
{
public IEventStoreConnection SingleConnection()
{
return EventStoreConnection.Create(ConnectionSettings.Create(),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
}
public IEventStoreConnection ClusterConnection()
{
return EventStoreConnection.Create(
ConnectionSettings.Create().KeepReconnecting().UseConsoleLogger()
.SetGossipSeedEndPoints(
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 4113),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6113))
.WithConnectionTimeoutOf(TimeSpan.FromMilliseconds(500))
);
}
public IEventStoreConnection CreateConnection(bool useCluster)
{
if (useCluster)
{
return ClusterConnection();
}
return SingleConnection();
}
}
The connection for the Cluster is created as described on eventstore documentation however, the documentation is not 100% aligned with the latest nuget version I use.
nuget Version:
<PackageReference Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />
WriteEvents:
public class WriteEvents
{
private ConnectionFactory _connectionFactory;
public WriteEvents(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public async Task Write(bool useCluster, string streamName)
{
using (var conn = _connectionFactory.CreateConnection(useCluster))
{
await conn.ConnectAsync();
for (var x = 0; x < 100; x++)
{
await conn.AppendToStreamAsync(streamName,
ExpectedVersion.Any,
GetEventDataFor(x));
Console.WriteLine("event " + x + " written.");
}
}
}
private IEnumerable<EventData> GetEventDataFor(int i)
{
yield return new EventData(
Guid.NewGuid(),
"MyTestEvent",
true,
Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
);
}
}
ReadEvents:
public class ReadEvents
{
private ConnectionFactory _connectionFactory;
public ReadEvents(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public async Task Read(bool useCluster, string streamName)
{
using (var conn = _connectionFactory.CreateConnection(useCluster))
{
await conn.ConnectAsync();
var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
foreach (var evt in slice.Events)
{
Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
}
}
}
}
Upvotes: 2
Views: 1934
Reputation: 11
this is not an issue related to the type of client. You'll get the same behaviour also with the non NetCore. When the client tries to connect to one node in the cluster, EventStore replies with an ip that is not reachable. In some scenarios as when you use a software orchestrator like Kubernetes or swarm your clients need to be hosted within the same overlay network to be able to connect to the cluster using the Tcp Client. While you develop your app/microservice you can use a connection string to a single-node EventStore not hosted in Kubernetes, swarm or the local Docker engine. When you deploy your app in a Kubernetes, swarm or Docker-Compose environment you can set the connection string of your app with the gossip seeds and use the ips of your cluster hosts.
C# Client using a cluster connection
namespace TestClusterConnection
{
class Program
{
private const string Stream = "MyTestStream";
static void Main(string[] args)
{
try
{
var useCluster = true;
Console.WriteLine("Start connecting to Eventstore");
Write(useCluster, Stream).Wait();
Console.WriteLine("Start reading events:");
Read(useCluster, Stream).Wait();
Console.WriteLine("Check if events are written and press key to exit");
}
catch (Exception e)
{
Console.WriteLine(e.GetBaseException().Message);
}
Console.ReadKey();
}
public static async Task Read(bool useCluster, string streamName)
{
using (var conn = CreateConnection(useCluster))
{
await conn.ConnectAsync();
var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
foreach (var evt in slice.Events)
Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
}
}
private static async Task Write(bool useCluster, string streamName)
{
using (var conn = CreateConnection(useCluster))
{
await conn.ConnectAsync();
for (var x = 0; x < 100; x++)
{
await conn.AppendToStreamAsync(streamName,
ExpectedVersion.Any,
GetEventDataFor(x));
Console.WriteLine("event " + x + " written.");
}
}
}
private static IEnumerable<EventData> GetEventDataFor(int i)
{
yield return new EventData(
Guid.NewGuid(),
"MyTestEvent",
true,
Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
);
}
private static IEventStoreConnection SingleConnection()
{
return EventStoreConnection.Create(ConnectionSettings.Create(),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
}
private static IEventStoreConnection ClusterConnection()
{
return EventStoreConnection.Create(
ConnectionSettings.Create().KeepRetrying().KeepReconnecting().UseConsoleLogger()
.SetGossipSeedEndPoints(
new IPEndPoint(IPAddress.Parse("172.19.0.2"), 2112),
new IPEndPoint(IPAddress.Parse("172.19.0.3"), 2112),
new IPEndPoint(IPAddress.Parse("172.19.0.4"), 2112))
.SetHeartbeatInterval(TimeSpan.FromSeconds(3))
.SetHeartbeatTimeout(TimeSpan.FromSeconds(6))
.WithConnectionTimeoutOf(TimeSpan.FromSeconds(10))
);
}
private static IEventStoreConnection CreateConnection(bool useCluster)
{
return useCluster ? ClusterConnection() : SingleConnection();
}
}
}
Dockerfile
FROM mono:4.6.2.16
ADD . /home/TestClusterConnection
CMD [ "mono", "home/TestClusterConnection/TestClusterConnection.exe" ]
docker-compose.yaml
version: '3.4'
services:
esclienttest:
image: testclient
build:
context: .
dockerfile: Dockerfile
depends_on:
- eventstore1
- eventstore2
- eventstore3
eventstore1:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore1
ports:
- 1113:1113
- 2112:2112
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.2
eventstore2:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore2
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.3
eventstore3:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore3
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.4
That is for test only and further network settings could be required.
Hope this helps.
Riccardo
Upvotes: 1