RabbitMQ 와 dotnet Worker Service
rabbitmq를 쓸 일이 있어서 정리해 보았다. 여기서는 consumer만 만들어서 처리하는 과정을 정리
rabbitmq를 실행
docker run --rm --hostname rabbitmq --name rabbitmq -p 30000:15672 -p 5672:5672 rabbitmq:3-management
localhost:30000으로 접속해서 확인해본다. 기본 아이디 비번은 guest/guest이다.
Worker Service Project 생성
dotnet worker service template을 사용하여 프로젝트를 생성한다.
worker service에 대한 설명은 생략한다. rabbitmq와 연동만 관심있음
nuget 설치
https://www.nuget.org/packages/RabbitMQ.Client
dotnet add package RabbitMQ.Client --version 6.2.1
Work 설정
EmainSenderWorker.cs
public class EmainSenderWorker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private ConnectionFactory _connectionFactory;
private IConnection _connection;
private IModel _channel;
private const string _queueName = "email";
public Worker(ILogger<Worker> logger)
{
_logger = logger;
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_connectionFactory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
DispatchConsumersAsync = true
};
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: _queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
_channel.BasicQos(0, 1, false);
_logger.LogInformation($"Queue [{_queueName}] is waiting for messages.");
return base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 이부분에서 처리를 해야한다.
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
_connection.Close();
_logger.LogInformation("RabbitMQ connection is closed.");
}
}
StartAsync, StopAsync 두개 부분만 일단 구현했다. 이제 ExecuteAsync 구현해보자.
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Email Sender Start...");
stoppingToken.ThrowIfCancellationRequested();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (bc, ea) =>
{
try
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var email = JsonConvert.DeserializeObject<EmailMessage>(message);
_logger.LogInformation($"Processing title: '{email.Title}'.");
EmailSender sender = new EmailSender
{
EmailHost = "smtp.gmail.com",
EmailPort = 587,
EmailUserName = "yourid",
EmailPassword = "your password",
EmailEnableSSL = true
};
await sender.SendEmailAsync(email);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (JsonException ex)
{
_logger.LogError($"JSON Parse Error {ex.Message}");
_channel.BasicNack(ea.DeliveryTag, false, false);
}
catch (Exception e)
{
_logger.LogError(default, e, e.Message);
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
_channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
await Task.CompletedTask;
}
데이터 전달용으로 다음 사용
EmailMessage.cs
public class EmailMessage
{
public string Title { get; set; } = "";
public string Content { get; set; } = "";
public string EmailTo { get; set; } = "";
public List<string>? EmailCC { get; set; }
public byte[]? Attachment { get; set; }
public string? AttachmentName { get; set; }
}
EmailSender.cs
public class EmailSender
{
public string EmailHost = "";
public int EmailPort = 587; //gmail
public string EmailUserName = "";
public string EmailPassword = "";
public bool EmailEnableSSL = true;
public async Task SendEmailAsync(EmailMessage message)
{
try
{
var client = new SmtpClient(EmailHost, EmailPort)
{
Credentials = new NetworkCredential(EmailUserName, EmailPassword),
EnableSsl = EmailEnableSSL
};
var mail = new MailMessage() { IsBodyHtml = true };
mail.From = new MailAddress(EmailUserName);
mail.Subject = message.Title;
mail.Body = message.Content;
foreach (var item in message.EmailTo.Split(new[] { ";", "," }, StringSplitOptions.RemoveEmptyEntries))
{
mail.To.Add(item);
}
if (message.EmailCC != null)
{
foreach (var item in message.EmailCC)
{
mail.CC.Add(item);
}
}
if (message.Attachment != null && message.AttachmentName != null)
{
var stream = new MemoryStream(message.Attachment);
mail.Attachments.Add(new Attachment(stream, message.AttachmentName));
}
await client.SendMailAsync(mail);
}
catch (Exception ex)
{
throw ex;
}
}
}
테스트
실행하고 나면 대기하는것을 볼수 있다.
이제 웹사이트에서 메세지를 추가해보자.
consulers에 하나가 붙어잇는것을 볼수 있고
publish message에서 메세지를 하나 넣어보자.
{
title: "test",
content: "test",
emailTo: "support@xxxx.com",
emailCC: ["brian@xxxx.com"],
}
메세지를 publish 하자 마자 처리되는것을 볼수 있다.
이메일도 잘 들어왔다.
참고로 dockerfile
FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build-env
WORKDIR /src
COPY ["EmailSender.csproj", ""]
RUN dotnet restore "./EmailSender.csproj"
COPY . .
WORKDIR /src
RUN dotnet publish "EmailSender.csproj" -c Release -o /app/publish
FROM mcr.microsoft.com/dotnet/runtime:5.0
WORKDIR /app
COPY --from=build-env /app/publish .
ENTRYPOINT ["dotnet", "EmailSender.dll"]
여러개의 worker를 사용하고 싶다.
program.cs
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args).ConfigureServices((hostContext, services) =>
{
services.AddHostedService<EmainSenderWorker>();
services.AddHostedService<OtherWorker>();
});
참고
https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async