Reputation: 1612
I need to subscribe to a topic(topic is a channel) before publishing to a topic, but when creating a thread I need to run go Func to keep listening to channels to process messages (for example from publish or subscribe a new subscribe ) the test works (but not every time), sometimes when I run the test it ends up posting a message on the channel (topic) before I'm listening to the topic (channel)
i have this test:
func Test_useCase_publish(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(, func(t *testing.T) {
t.Parallel() = &RepositoryMock{
GetTopicFunc: func(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) {
return tt.fields.topic, nil
useCase := New(
subscribed := make(chan struct{})
go func() {
ch, _, err := useCase.Subscribe(tt.args.ctx, tt.args.message.TopicName)
require.NoError(t, err)
msg, ok := <-ch
if ok {
fmt.Println("msg", msg)
assert.Equal(t, tt.want, msg)
err := useCase.Publish(tt.args.ctx, tt.args.message)
assert.ErrorIs(t, err, tt.wantErr)
topic :
func (t Topic) Activate() {
go t.listenForSubscriptions()
go t.listenForMessages()
go t.listenForKills()
func (t *Topic) listenForSubscriptions() {
for newSubCh := range t.newSubCh {
t.Subscribers.Store(newSubCh.GetID(), newSubCh)
func (t *Topic) listenForKills() {
for subscriberID := range t.killSubCh {
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
if subscriber, ok := value.(Subscriber); ok {
return true
func (t Topic) Dispatch(message vos.Message) {
t.newMessageCh <- message
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
if subscriber, ok := value.(Subscriber); ok {
return true
func (u useCase) Subscribe(ctx context.Context, topicName vos.TopicName) (chan vos.Message, vos.SubscriberID, error) {
if err := topicName.Validate(); err != nil {
return nil, "", err
topic, err :=, topicName)
if err != nil {
if !errors.Is(err, entities.ErrTopicNotFound) {
return nil, "", err
topic, err = u.createTopic(ctx, topicName)
if err != nil {
return nil, "", err
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
func (s Subscriber) Subscribe() (chan vos.Message, vos.SubscriberID) {
return s.subscriptionCh, s.GetID()
func (s Subscriber) ReceiveMessage(msg vos.Message) {
s.subscriptionCh <- msg
publisher :
func (u useCase) Publish(ctx context.Context, message vos.Message) error {
if err := message.Validate(); err != nil {
return err
topic, err :=, message.TopicName)
if err != nil {
return err
return nil
when I call subscribe (I send a message to a subscribe to channel and add a subscribe to my thread) when I post a message to a topic I send a message to topic channel
Upvotes: 1
Views: 1908
Reputation: 52176
Some points are missing from the code you show, such as the code for .Subscribe()
and .Publish()
, or how the channels are instanciated (are they buffered/unbuffered ?).
One point can be said, though :
from the looks of (t *Topic) listenForSubscriptions()
: this subscribing method does not send any signal to the subscriber that it has been registered.
So my guess is : your useCase.Subscribe(...)
call has the information that the created channel has been written on newSubCH
, but it hasn't got the inforamtion that t.Subcribers.Store(...)
has completed.
So, depending on how the goroutines are scheduled, the message sending in your test function can occur before the channel has actually been registered.
To fix this, you add something that will send a signal back to the caller. One possible way :
type subscribeReq struct{
ch chan Message
done chan struct{}
// turn Topic.newSubCh into a chan *subscribeReq
func (t *Topic) listenForSubscriptions() {
for req := range t.newSubCh {
Another point : your test function does not check if the goroutine spun with your go func(){ ... }()
call completes at all, so your unit test process may also exit before the goroutine has had the chance to execute fmt.Println(msg)
A common way to check this is to use a sync.WaitGroup
t.Run(, func(t *testing.T) {
useCase := New(
subscribed := make(chan struct{})
wg := &sync.WaitGroup{} // create a *sync.WaitGroup
wg.Add(1) // increment by 1 (you start only 1 goroutine)
go func() {
defer wg.Done() // have the goroutine call wg.Done() when returning
// send message, check that no error occurs
wg.Wait() // block here until the goroutine has completed
Upvotes: 1