b4n4n4
b4n4n4

Reputation: 181

How to write and read events on Eventstore Cluster by .Net client

I try to write and read events on an eventstore cluster using a .Net client. Therefore I've set up 3 local docker containers to test the write and read to the cluster. I've already set up the cluster and it says it's alive (1 Master and 2 Slaves).

Unfortunately when I try to write or read from the .Net client on the eventstore the connection opens and closes itself all the time and no calls get to the store. How can I connect to a cluster to write and read events?

If I use a eventstore without a cluster the methods works correctly and I can create and consume events. With the cluster I only receive the following log messages:

Start connecting to Eventstore
Start reading events:
[04,06:49:19.771,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[04,06:49:19.771,INFO] Discovering attempt 1/10 successful: best candidate is [172.16.1.103:1112, n/a].
[10,06:49:20.396,INFO] ClientAPI TcpConnection closed [06:49:20.396: N172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}]:
[10,06:49:20.396,INFO] Received bytes: 0, Sent bytes: 0
[10,06:49:20.396,INFO] Send calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Receive calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Close reason: [Success] Connection establishment timeout.
[10,06:49:20.397,DEBUG] TcpPackageConnection: connection [172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}] was closed cleanly.
[10,06:49:22.564,INFO] Discovering attempt 1/10 failed: no candidate found.
[10,06:49:23.068,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[10,06:49:23.068,INFO] Discovering attempt 2/10 successful: best candidate is [172.16.1.103:1112, n/a].

If I write an event to the clustered eventstore by using the WebUI it will accept it and the event can then be found in the WebUI of the eventstore.


What follows is my setup in case you need the details:

The eventstore cluster is set up as described in the eventstore docs. The cluster is set up to not use dns by setting discover-via-dns=false. Here is my .yaml file for the docker setup:

version: '3'
services:
  eventstore1:
    image: "eventstore/eventstore:latest"
    environment:
      EVENTSTORE_DISABLE_HTTP_CACHING: "True"
      EVENTSTORE_RUN_PROJECTIONS: ALL
      EVENTSTORE_CLUSTER_SIZE: 3
      EVENTSTORE_INT_IP: 172.16.1.101
      EVENTSTORE_EXT_IP: 172.16.1.101
      EVENTSTORE_INT_TCP_PORT: 1111
      EVENTSTORE_EXT_TCP_PORT: 1112
      EVENTSTORE_INT_HTTP_PORT: 2113
      EVENTSTORE_EXT_HTTP_PORT: 2114
      EVENTSTORE_DISCOVER_VIA_DNS: "False"
      EVENTSTORE_GOSSIP_SEED: 172.16.1.102:2113,172.16.1.103:2113
      EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
      EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
    ports:
      - 1111:1111
      - 1112:1112
      - 2113:2113
      - 2114:2114
    networks:
      app_net:
        ipv4_address: 172.16.1.101
  eventstore2:
    image: "eventstore/eventstore:latest"
    environment:
      EVENTSTORE_DISABLE_HTTP_CACHING: "True"
      EVENTSTORE_RUN_PROJECTIONS: ALL
      EVENTSTORE_CLUSTER_SIZE: 3
      EVENTSTORE_INT_IP: 172.16.1.102
      EVENTSTORE_EXT_IP: 172.16.1.102
      EVENTSTORE_INT_TCP_PORT: 1111
      EVENTSTORE_EXT_TCP_PORT: 1112
      EVENTSTORE_INT_HTTP_PORT: 2113
      EVENTSTORE_EXT_HTTP_PORT: 2114
      EVENTSTORE_DISCOVER_VIA_DNS: "False"
      EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.103:2113
      EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
      EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:4114/"
    ports:
      - 3111:1111
      - 3112:1112
      - 4113:2113
      - 4114:2114
    networks:
      app_net:
        ipv4_address: 172.16.1.102
  eventstore3:
    image: "eventstore/eventstore:latest"
    environment:
      EVENTSTORE_DISABLE_HTTP_CACHING: "True"
      EVENTSTORE_RUN_PROJECTIONS: ALL
      EVENTSTORE_CLUSTER_SIZE: 3
      EVENTSTORE_INT_IP: 172.16.1.103
      EVENTSTORE_EXT_IP: 172.16.1.103
      EVENTSTORE_INT_TCP_PORT: 1111
      EVENTSTORE_EXT_TCP_PORT: 1112
      EVENTSTORE_INT_HTTP_PORT: 2113
      EVENTSTORE_EXT_HTTP_PORT: 2114
      EVENTSTORE_DISCOVER_VIA_DNS: "False"
      EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.102:2113
      EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
      EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
    ports:
      - 5111:1111
      - 5112:1112
      - 6113:2113
      - 6114:2114
    networks:
      app_net:
        ipv4_address: 172.16.1.103
networks:
  app_net:
    external: true

To test the connection and if it is possible to write and read on the eventstore cluster I created the following small application:

Main Program:

class Program
{
    const string STREAM = "MyTestStream";
    private static ConnectionFactory _connectionFactory = new ConnectionFactory();

    static void Main(string[] args)
    {
        var useCluster = true;
        Console.WriteLine("Start connecting to Eventstore");
        var writeEvents = new WriteEvents(_connectionFactory);
        writeEvents.Write(useCluster, STREAM).Wait();
        Console.WriteLine("Start reading events:");
        var readEvents = new ReadEvents(_connectionFactory);
        readEvents.Read(useCluster, STREAM).Wait();
        Console.WriteLine("Check if events are written and press key to exit");
        Console.ReadKey();
    }
}

ConnectionFactory:

public class ConnectionFactory
{
    public IEventStoreConnection SingleConnection()
    {
        return EventStoreConnection.Create(ConnectionSettings.Create(),
            new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
    }

    public IEventStoreConnection ClusterConnection()
    {
        return EventStoreConnection.Create(
            ConnectionSettings.Create().KeepReconnecting().UseConsoleLogger()
                .SetGossipSeedEndPoints(
                    new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
                    new IPEndPoint(IPAddress.Parse("127.0.0.1"), 4113),
                    new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6113))
                .WithConnectionTimeoutOf(TimeSpan.FromMilliseconds(500))
                );
    }

    public IEventStoreConnection CreateConnection(bool useCluster)
    {
        if (useCluster)
        {
            return ClusterConnection();
        }

        return SingleConnection();
    }
}

The connection for the Cluster is created as described on eventstore documentation however, the documentation is not 100% aligned with the latest nuget version I use.

nuget Version:

<PackageReference Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />

WriteEvents:

public class WriteEvents
{
    private ConnectionFactory _connectionFactory;

    public WriteEvents(ConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
    }

    public async Task Write(bool useCluster, string streamName)
    {
        using (var conn = _connectionFactory.CreateConnection(useCluster))
        {
            await conn.ConnectAsync();
            for (var x = 0; x < 100; x++)
            {
                await conn.AppendToStreamAsync(streamName,
                    ExpectedVersion.Any,
                    GetEventDataFor(x));
                Console.WriteLine("event " + x + " written.");
            }
        }
    }

    private IEnumerable<EventData> GetEventDataFor(int i)
    {
        yield return new EventData(
            Guid.NewGuid(),
            "MyTestEvent",
            true,
            Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
            Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
        );
    }
}

ReadEvents:

public class ReadEvents
{
    private ConnectionFactory _connectionFactory;

    public ReadEvents(ConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
    }

    public async Task Read(bool useCluster, string streamName)
    {

        using (var conn = _connectionFactory.CreateConnection(useCluster))
        {
            await conn.ConnectAsync();
            var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
            foreach (var evt in slice.Events)
            {
                Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
            }
        }
    }
}

Upvotes: 2

Views: 1934

Answers (1)

riccardone
riccardone

Reputation: 11

this is not an issue related to the type of client. You'll get the same behaviour also with the non NetCore. When the client tries to connect to one node in the cluster, EventStore replies with an ip that is not reachable. In some scenarios as when you use a software orchestrator like Kubernetes or swarm your clients need to be hosted within the same overlay network to be able to connect to the cluster using the Tcp Client. While you develop your app/microservice you can use a connection string to a single-node EventStore not hosted in Kubernetes, swarm or the local Docker engine. When you deploy your app in a Kubernetes, swarm or Docker-Compose environment you can set the connection string of your app with the gossip seeds and use the ips of your cluster hosts.

C# Client using a cluster connection

namespace TestClusterConnection
{
    class Program
    {
        private const string Stream = "MyTestStream";

        static void Main(string[] args)
        {
            try
            {
                var useCluster = true;
                Console.WriteLine("Start connecting to Eventstore");
                Write(useCluster, Stream).Wait();
                Console.WriteLine("Start reading events:");
                Read(useCluster, Stream).Wait();
                Console.WriteLine("Check if events are written and press key to exit");
            }
            catch (Exception e)
            {
                Console.WriteLine(e.GetBaseException().Message);
            }
            Console.ReadKey();
        }

        public static async Task Read(bool useCluster, string streamName)
        {
            using (var conn = CreateConnection(useCluster))
            {
                await conn.ConnectAsync();
                var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
                foreach (var evt in slice.Events)
                    Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
            }
        }

        private static async Task Write(bool useCluster, string streamName)
        {
            using (var conn = CreateConnection(useCluster))
            {
                await conn.ConnectAsync();
                for (var x = 0; x < 100; x++)
                {
                    await conn.AppendToStreamAsync(streamName,
                        ExpectedVersion.Any,
                        GetEventDataFor(x));
                    Console.WriteLine("event " + x + " written.");
                }
            }
        }

        private static IEnumerable<EventData> GetEventDataFor(int i)
        {
            yield return new EventData(
                Guid.NewGuid(),
                "MyTestEvent",
                true,
                Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
                Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
            );
        }


        private static IEventStoreConnection SingleConnection()
        {
            return EventStoreConnection.Create(ConnectionSettings.Create(),
                new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
        }

        private static IEventStoreConnection ClusterConnection()
        {
            return EventStoreConnection.Create(
                ConnectionSettings.Create().KeepRetrying().KeepReconnecting().UseConsoleLogger()
                    .SetGossipSeedEndPoints(
                        new IPEndPoint(IPAddress.Parse("172.19.0.2"), 2112),
                        new IPEndPoint(IPAddress.Parse("172.19.0.3"), 2112),
                        new IPEndPoint(IPAddress.Parse("172.19.0.4"), 2112))
                    .SetHeartbeatInterval(TimeSpan.FromSeconds(3))
                    .SetHeartbeatTimeout(TimeSpan.FromSeconds(6))
                    .WithConnectionTimeoutOf(TimeSpan.FromSeconds(10))
            );
        }

        private static IEventStoreConnection CreateConnection(bool useCluster)
        {
            return useCluster ? ClusterConnection() : SingleConnection();
        }
    }
}

Dockerfile

FROM mono:4.6.2.16
ADD . /home/TestClusterConnection
CMD [ "mono",  "home/TestClusterConnection/TestClusterConnection.exe" ]

docker-compose.yaml

version: '3.4'

services:
  esclienttest:
    image: testclient
    build:
      context: .
      dockerfile: Dockerfile
    depends_on:
    - eventstore1
    - eventstore2
    - eventstore3

  eventstore1:
    image: eventstore/eventstore:release-4.1.0
    hostname: eventstore1    
    ports:    
    - 1113:1113
    - 2112:2112     
    environment:
      EVENTSTORE_CLUSTER_DNS: eventstore1
      EVENTSTORE_CLUSTER_SIZE: 3      
      EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112   
      EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.2 

  eventstore2:
    image: eventstore/eventstore:release-4.1.0
    hostname: eventstore2
    environment:
      EVENTSTORE_CLUSTER_DNS: eventstore1
      EVENTSTORE_CLUSTER_SIZE: 3      
      EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112      
      EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.3  

  eventstore3:
    image: eventstore/eventstore:release-4.1.0
    hostname: eventstore3
    environment:
      EVENTSTORE_CLUSTER_DNS: eventstore1
      EVENTSTORE_CLUSTER_SIZE: 3      
      EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112      
      EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.4

That is for test only and further network settings could be required.

Hope this helps.

Riccardo

Upvotes: 1

Related Questions