MqttApps/mqttLog/mqttLog/Worker.cs

81 lines
3.0 KiB
C#

namespace mqttLog
{
using log4net;
using Microsoft.Extensions.Logging;
//using log4net.Config;
using MQTTnet;
using System.Buffers;
using System.Text;
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private static readonly ILog log = LogManager.GetLogger("MQTTlOG");
IMqttClient mqttClient;
string server, idClient;
string topic;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
server = "192.168.2.50";
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
log.Info("INICIO-------------------------------------");
if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}
var mqttFactory = new MqttClientFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(server).Build();
// Setup message handling before connecting so that queued messages
// are also handled properly. When there is no event handler attached all
// received messages get lost.
mqttClient.ApplicationMessageReceivedAsync += e =>
{
onReceived(e);
return Task.CompletedTask;
};
var conect= await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
if(conect.ResultCode== MqttClientConnectResultCode.Success)
{
var topicFilter = new MqttTopicFilterBuilder().WithTopic("casa/#").Build();
var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(topicFilter).Build();
//var susb = await mqttClient.SubscribeAsync("casa");
var susb = await mqttClient.SubscribeAsync(subscribeOptions);
}
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await mqttClient.DisconnectAsync();
}
log.Info("FINAL FELIZ-------------------------------------");
}
public void onReceived(MqttApplicationMessageReceivedEventArgs e)
{
StringBuilder sb = new StringBuilder();
SequenceReader<byte> reader = new SequenceReader<byte>(e.ApplicationMessage.Payload);
while (!reader.End)
{
sb.Append(Encoding.UTF8.GetString(reader.UnreadSpan));
reader.Advance(reader.UnreadSpan.Length);
}
string result = sb.ToString();
_logger.LogInformation(e.ApplicationMessage.Topic + " --> " + result);
log.Info(e.ApplicationMessage.Topic +" --> " + result);
}
}
}