Reputation: 1665
I am trying to do some test with RocketMQ, and I use C# with Newlife.RocketMQ. I started RocketMQ on my pc and I see the command window open and printing messages -- so that is ok.
So I wrote a simple C# program to test this:
This is the WPF window:
<Window x:Class="WpfApp1.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:local="clr-namespace:WpfApp1"
mc:Ignorable="d"
Title="MainWindow" Height="450" Width="800" >
<DockPanel >
<TextBox x:Name="txtProduce" Text="message 1" DockPanel.Dock="Top" Height="30" FontSize="16"/>
<Button x:Name="btnProduce" Content="Produce" HorizontalAlignment="Left" VerticalAlignment="Top" Width="75" Click="BtnProduce_Click" DockPanel.Dock="Top"/>
<DataGrid x:Name="dgProduce" Height="100" DockPanel.Dock="Top" IsReadOnly="True"/>
<Label Content="Consumer" DockPanel.Dock="Top"/>
<DataGrid x:Name="dgConsume" IsReadOnly="True"/>
</DockPanel>
</Window>
And now the C# code:
public partial class MainWindow : Window
{
Producer _producer;
Consumer _consumer;
ObservableCollection<SendResult> _results = new ObservableCollection<SendResult>();
object _resultsLock = new object();
ObservableCollection<MessageExt> _messages = new ObservableCollection<MessageExt>();
object _messagesLock = new object();
public MainWindow()
{
InitializeComponent();
dgProduce.ItemsSource = _results;
BindingOperations.EnableCollectionSynchronization(_results, _resultsLock);
dgConsume.ItemsSource = _messages;
BindingOperations.EnableCollectionSynchronization(_messages, _messagesLock);
_producer = new Producer
{
Topic = "TopicTest",
NameServerAddress = "localhost:9876",
RetryTimesWhenSendFailed = 10
};
_producer.Start();
_consumer = new Consumer
{
Topic = "TopicTest",
NameServerAddress = "localhost:9876"
};
_consumer.OnConsume = (q, ms) =>
{
foreach(var item in ms.ToList())
_messages.Add(item);
return true;
};
_consumer.Start();
}
private int _i = 1;
private void BtnProduce_Click(object sender, RoutedEventArgs e)
{
try
{
var content = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss.fff") + ":" + txtProduce.Text;
var message = new Message()
{
Body = System.Text.Encoding.Default.GetBytes(content),
Keys = (_i++).ToString(),
Tags = _i % 2 == 0 ? "even" : "odd",
Flag = 0,
WaitStoreMsgOK = true
};
var sr = _producer.Publish(message);
_results.Add(sr);
txtProduce.Text = "message " + _i;
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
}
And now I see problems:
Why this happen?
Upvotes: 1
Views: 92
Reputation: 11
question 1:after return true when consumed, the message will never comes to you again unless commit fail.
question 2: rocketmq has two ways of consuming: cluster and broadcast.
i.e: 2 consumer instances, 100 messages sent,
for cluster: 50 for consumer instance 1, another 50 for consumer instance 2
for broadcast: 100 for consumer instance 1, same 100 for consumer instance 2
consume doc : https://rocketmq.apache.org/docs/4.x/consumer/02push/#cluster-and-broadcast-mode
Upvotes: 0