【MQTTnet】C#でMQTTを扱う

f:id:newbie29979:20210113193113p:plain

はじめに

Raspberry piなどでモノとモノを連携させる際におなじみのプロトコルであるMQTTだが,それをC#の.NETを使ってどのように実装するのかについての記事は少ない(体感).大抵はpythonでどうにかなってしまう.
本記事はそのような中でもC#で,.NETを使ってMQTTのsubscribe/publishをしなければならないとき助けとなれば,と思い書く.
ベースとなっているのは以下の記事なので,良ければこちらも参考にしてほしい.
www.rect29.com

背景

C#でMQTTを扱えるライブラリを検索すると必ずといってほどpaho.mqttがヒットする.
github.com
しかしこのライブラリ,更新がだいぶ古く少し怪しい.そこで本記事ではMQTTnetを使った実装を紹介していく.
github.com
MQTTブローカーはおなじみのBeebotte

実装

本記事では,Publisher/Subscriberの2つに分けて取り上げる.

共通の実装

Publisher/Subscriberともに,MQTTブローカーを扱うにはまずChannelに接続するためChannelのオプションを設定する必要がある.公式のWikiには以下のように設定するよう例が記述されている.

// Create TCP based options using the builder.
var options = new MqttClientOptionsBuilder()
    .WithClientId("Client1")
    .WithTcpServer("broker.hivemq.com")
    .WithCredentials("bud", "%spencer%")
    .WithTls()
    .WithCleanSession()
    .Build();

さて今回の実装はBeeBotteへの接続である.これについては様々な実装があるが,例えば接続が必要なパラメータを以下のようにjsonとして実装する.

{
  "Host": "MQTT_HOST",
  "Topic": "TOPIC",
  "Token": "TOKEN",
  "Port": PORT_NUM
}

Hostはbeebotteを使う場合では"mqtt.beebotte.com",Topicは"[Channel]/[Resource]",Tokenは"[Channelに発行されたToken]",Portは"8883"となる.これを踏まえた実装としては以下のように書ける.

Options = new MqttClientOptionsBuilder()
    .WithTcpServer(Prop.Host, Prop.Port)
    .WithCredentials(Prop.Token, "")
    .WithTls()
    .WithCleanSession()
    .Build();

jsonでは定義されているが実装では使われていないように見えるプロパティである"Topic"はConnect()のイベントハンドラの登録時に使う.
Connect()のためのOptionsは定義したので,あとは関連するメソッドを定義すればよい.

Channelに接続するためのイベントハンドラ登録メソッド.

public virtual void OnStarted()
{
    MqttClient.UseConnectedHandler(async e =>
    {
        _ = await MqttClient.SubscribeAsync(new TopicFilterBuilder()
            .WithTopic(Prop.Topic)
            .Build());
    });
    MqttClient.UseDisconnectedHandler(async e =>
    {
        await Task.Delay(TimeSpan.FromSeconds(3));
        try
        {
            Connect();
        }
        catch { }
    });
}

実際にChannelに接続するメソッド.

public async void Connect()
{
    var retry = 0;
    while (!MqttClient.IsConnected && retry < 10)
    {
        try
        {
            MqttClient.ConnectAsync(Options, CancellationToken.None).Wait();
        }
        catch
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
        ++retry;
    }
}

以上はPublisher/Subscriberともに必要なので,このクラスを継承するようにPublisher/Subscriberを実装していく.とはいえ実装の半分はこれで完了したようなものである.

Publisher

既にChannelへの接続はできているのでPublisherで実装するべきはデータを載せるだけ.例えばこんな感じ.

public void PublishUrl(string url)
{
    var dataModel = new PublishedDataModel(url, DateTime.Now.ToString("yyyyMMddHHmmss") ,false);
    var message = new MqttApplicationMessageBuilder()
        .WithTopic(Prop.Topic)
        .WithPayload(JsonConvert.SerializeObject(dataModel))
        .WithAtLeastOnceQoS()
        .WithRetainFlag()
        .Build();

    MqttClient.PublishAsync(message, CancellationToken.None);
}

dataModelは今回の成果物のオブジェクトなので実装の本質ではない.このオブジェクトはjsonとしてPayloadに載せる.メッセージの実体はこのPayloadになる.Publish時には.WithTopic()でTopicを指定するのを忘れないこと.

Subscriber

Subscribeはほぼ実装できているようなもの.実装しなければならないのは,SubscribeしているChannelにデータが流れてきたときに何をするのかのイベントハンドラの登録.つまりこんな感じになる.

public override void OnStarted()
{
    base.OnStarted();

    MqttClient.UseApplicationMessageReceivedHandler(e =>
    {
        var appMessage = e.ApplicationMessage;
        var payload = Encoding.UTF8.GetString(appMessage.Payload, 0, appMessage.Payload.Length);
        try
        {
            var publishedDataModel = JsonConvert.DeserializeObject<PublishedDataModel>(payload);
            WebpageAccessor.Access(publishedDataModel.Data);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    });
 }

重要なのはUseApplicationMessageReceivedHandler().これはMessageをReceiveしたときの挙動を定義するもの.payloadをutf-8で念のためencodeする.続く実装は成果物の実装であり本記事の本質ではない.
例外処理は...もっとマシな実装があるはず.

そのほか

実はこれを言ってしまうと元も子もないが,実は公式のWikiを参照するのが早い.
github.com

まとめ

C#でBeeBotteに接続する方法について紹介した.検索するとpaho.mqttがヒットしがちだが,このライブラリの最近の更新は見られないため,MQTTnetを使った実装を取り上げた.本記事ではClientをPublisher/Subscriberの2つに分けて紹介した.