[Submission] MQTT Library and Sample

Hi all,

I have been playing around with Nick O’leary’s PubSubClient for MQTT (Arduino based) and now have it running on the :spark:. Only a small number of mods to get it up and running.

For the broker I used Mosquitto (mosquitto.org) which supports several OS’s and is very quick to get up and running. I have the library working for unauthenticated pub/sub to the broker as well as user id and password based authentication.

Once connected it is very simple to publish to a queue using

client.publish("Spark Status","I'm Alive...");

Or subscribe to an exiting queue / topic using

client.subscribe("led");

There is a call back function that is used to trap incoming messages for a subscribed topic which can be tailored to do what you need.

I have attached the libary and an sample with callback below. Right now all it does is allow you to control the led on D7. Send a mesage to topic led and use on, off or flash as the payload.

Publishing to a queue

To post a message to a topic you can use the mosquitto_pub tool and specify the topic and payload as follows

mosquitto_pub -h 192.168.0.50 led -m flash

or if using authentication

mosquitto_pub -h 192.168.0.50 -u userid -P password -t led -m flash

Subscribing to a queue

You can the mosquitto_sub program as a method to listen to topics - this might be a reasonable alternative to printing via the serial port for logging purposed. # is all topics.

mosquitto_sub -h 192.168.0.50 -t “#” -v

Library Code and Sample

Code below - you need to amend the ip address (byte arrange ip[]) and the client.connect call for your own use.

/*
  PubSubClient.cpp - A simple client for MQTT.
  Original Code - Nicholas O'Leary
  http://knolleary.net
  
  Adapted for Spark Core by Chris Howard - chris@kitard.com
  Based on PubSubClient 1.9.1
  
  Changes
  - Added gcc pragma to avoid warnings being thrown as errors (deprecated conversion from string constant to 'char*')
  - publish_P function removed due to lack of Arduino PROGMEN support on the Spark Core
  - Obvious includes commented out
  - Using Spark TCPClient instead of Arduino EthernetClient
  
*/

#pragma GCC diagnostic ignored "-Wwrite-strings"

// #include "PubSubClient.h"
// #include <Arduino.h>
#define ARDUINO_H
#include <stdint.h>
#include <stddef.h>
#include <stdlib.h>

//#include "Client.h"

// MQTT_MAX_PACKET_SIZE : Maximum packet size
#define MQTT_MAX_PACKET_SIZE 128

// MQTT_KEEPALIVE : keepAlive interval in Seconds
#define MQTT_KEEPALIVE 15

#define MQTTPROTOCOLVERSION 3
#define MQTTCONNECT     1 << 4  // Client request to connect to Server
#define MQTTCONNACK     2 << 4  // Connect Acknowledgment
#define MQTTPUBLISH     3 << 4  // Publish message
#define MQTTPUBACK      4 << 4  // Publish Acknowledgment
#define MQTTPUBREC      5 << 4  // Publish Received (assured delivery part 1)
#define MQTTPUBREL      6 << 4  // Publish Release (assured delivery part 2)
#define MQTTPUBCOMP     7 << 4  // Publish Complete (assured delivery part 3)
#define MQTTSUBSCRIBE   8 << 4  // Client Subscribe request
#define MQTTSUBACK      9 << 4  // Subscribe Acknowledgment
#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request
#define MQTTUNSUBACK    11 << 4 // Unsubscribe Acknowledgment
#define MQTTPINGREQ     12 << 4 // PING Request
#define MQTTPINGRESP    13 << 4 // PING Response
#define MQTTDISCONNECT  14 << 4 // Client is Disconnecting
#define MQTTReserved    15 << 4 // Reserved

#define MQTTQOS0        (0 << 1)
#define MQTTQOS1        (1 << 1)
#define MQTTQOS2        (2 << 1)

class PubSubClient {
private:
   //Client* _client;
   TCPClient* _client; // CH 14Jan2014 - changed Client* to TCPClient*
   uint8_t buffer[MQTT_MAX_PACKET_SIZE];
   uint16_t nextMsgId;
   unsigned long lastOutActivity;
   unsigned long lastInActivity;
   bool pingOutstanding;
   void (*callback)(char*,uint8_t*,unsigned int);
   uint16_t readPacket(uint8_t*);
   uint8_t readByte();
   bool write(uint8_t header, uint8_t* buf, uint16_t length);
   uint16_t writeString(char* string, uint8_t* buf, uint16_t pos);
   uint8_t *ip;
   char* domain;
   uint16_t port;
public:
   PubSubClient();
   PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),TCPClient& client); // CH 14Jan2014 - changed Client& to TCPClient&
   PubSubClient(char *, uint16_t, void(*)(char *,uint8_t*,unsigned int),TCPClient&  client); // CH 14Jan2014 - changed Client& to TCPClient&
   //bool connect(const char *);
   bool connect(char *);
   bool connect(char *, char *, char *);
   bool connect(char *, char *, uint8_t, uint8_t, char *);
   bool connect(char *, char *, char *, char *, uint8_t, uint8_t, char *);
   void disconnect();
   bool publish(char *, char *);
   bool publish(char *, uint8_t *, unsigned int);
   bool publish(char *, uint8_t *, unsigned int, bool);
   bool subscribe(char *);
   bool subscribe(char *, uint8_t qos);
   bool unsubscribe(char *);
   bool puback(uint16_t msgId);
   bool loop();
   bool connected();
};


#include <string.h>

PubSubClient::PubSubClient() {
   this->_client = NULL;
}

PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
   this->_client = &client;
   this->callback = callback;
   this->ip = ip;
   this->port = port;
   this->domain = NULL;
}

PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
   this->_client = &client;
   this->callback = callback;
   this->domain = domain;
   this->port = port;
}


// CONNECT

//bool PubSubClient::connect(const char *id) {
bool PubSubClient::connect(char *id) {
   return connect(id,NULL,NULL,0,0,0,0);
}

bool PubSubClient::connect(char *id, char *user, char *pass) {
   return connect(id,user,pass,0,0,0,0);
}

bool PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
{
   return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
}

bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
   if (!connected()) {
      int result = 0;
      
      if (domain != NULL) {
        result = _client->connect(this->domain, this->port);
      } else {
        result = _client->connect(this->ip, this->port);
      }
      
      if (result) {
         nextMsgId = 1;
         uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
         // Leave room in the buffer for header and variable length field
         uint16_t length = 5;
         unsigned int j;
         for (j = 0;j<9;j++) {
            buffer[length++] = d[j];
         }

         uint8_t v;
         if (willTopic) {
            v = 0x06|(willQos<<3)|(willRetain<<5);
         } else {
            v = 0x02;
         }

         if(user != NULL) {
            v = v|0x80;

            if(pass != NULL) {
               v = v|(0x80>>1);
            }
         }

         buffer[length++] = v;

         buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
         buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
         length = writeString(id,buffer,length);
         if (willTopic) {
            length = writeString(willTopic,buffer,length);
            length = writeString(willMessage,buffer,length);
         }

         if(user != NULL) {
            length = writeString(user,buffer,length);
            if(pass != NULL) {
               length = writeString(pass,buffer,length);
            }
         }
         
         write(MQTTCONNECT,buffer,length-5);
         
         lastInActivity = lastOutActivity = millis();
         
         while (!_client->available()) {
            unsigned long t = millis();
            if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
               _client->stop();
               return false;
            }
         }
         uint8_t llen;
         uint16_t len = readPacket(&llen);
         
         if (len == 4 && buffer[3] == 0) {
            lastInActivity = millis();
            pingOutstanding = false;
            return true;
         }
      }
      _client->stop();
   }
   return false;
}

uint8_t PubSubClient::readByte() {
   while(!_client->available()) {}
   return _client->read();
}

uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
   uint16_t len = 0;
   buffer[len++] = readByte();
   uint32_t multiplier = 1;
   uint16_t length = 0;
   uint8_t digit = 0;
   do {
      digit = readByte();
      buffer[len++] = digit;
      length += (digit & 127) * multiplier;
      multiplier *= 128;
   } while ((digit & 128) != 0);
   *lengthLength = len-1;
   for (uint16_t i = 0;i<length;i++)
   {
      if (len < MQTT_MAX_PACKET_SIZE) {
         buffer[len++] = readByte();
      } else {
         readByte();
         len = 0; // This will cause the packet to be ignored.
      }
   }

   return len;
}

bool PubSubClient::loop() {
   if (connected()) {
      unsigned long t = millis();
      if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
         if (pingOutstanding) {
            _client->stop();
            return false;
         } else {
            buffer[0] = MQTTPINGREQ;
            buffer[1] = 0;
            _client->write(buffer,2);
            lastOutActivity = t;
            lastInActivity = t;
            pingOutstanding = true;
         }
      }
      if (_client->available()) {
         uint8_t llen;
         uint16_t len = readPacket(&llen);
         uint16_t msgId = 0;
         uint8_t *payload;
         if (len > 0) {
            lastInActivity = t;
            uint8_t type = buffer[0]&0xF0;
            if (type == MQTTPUBLISH) {
               if (callback) {
                  uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
                  char topic[tl+1];
                  for (uint16_t i=0;i<tl;i++) {
                     topic[i] = buffer[llen+3+i];
                  }
                  topic[tl] = 0;
                  // msgId only present for QOS>0
                  if (buffer[0]&MQTTQOS1) {
                    msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
                    payload = buffer+llen+3+tl+2;
                    callback(topic,payload,len-llen-3-tl-2);
                    puback(msgId);
                  } else {
                    payload = buffer+llen+3+tl;
                    callback(topic,payload,len-llen-3-tl);
                  }
               }
            } else if (type == MQTTPINGREQ) {
               buffer[0] = MQTTPINGRESP;
               buffer[1] = 0;
               _client->write(buffer,2);
            } else if (type == MQTTPINGRESP) {
               pingOutstanding = false;
            }
         }
      }
      return true;
   }
   return false;
}


// PUBLISH

bool PubSubClient::publish(char* topic, char* payload) {
   return publish(topic,(uint8_t*)payload,strlen(payload),false);
}

bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) {
   return publish(topic, payload, plength, false);
}

bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
   if (connected()) {
      // Leave room in the buffer for header and variable length field
      uint16_t length = 5;
      length = writeString(topic,buffer,length);
      uint16_t i;
      for (i=0;i<plength;i++) {
         buffer[length++] = payload[i];
      }
      uint8_t header = MQTTPUBLISH;
      if (retained) {
         header |= 1;
      }
      return write(header,buffer,length-5);
   }
   return false;
}

bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
   uint8_t lenBuf[4];
   uint8_t llen = 0;
   uint8_t digit;
   uint8_t pos = 0;
   uint8_t rc;
   uint8_t len = length;
   do {
      digit = len % 128;
      len = len / 128;
      if (len > 0) {
         digit |= 0x80;
      }
      lenBuf[pos++] = digit;
      llen++;
   } while(len>0);

   buf[4-llen] = header;
   for (int i=0;i<llen;i++) {
      buf[5-llen+i] = lenBuf[i];
   }
   rc = _client->write(buf+(4-llen),length+1+llen);
   
   lastOutActivity = millis();
   return (rc == 1+llen+length);
}

bool PubSubClient::subscribe(char* topic) {
  return subscribe(topic, 0);
}

// SUBSCRIBE

bool PubSubClient::subscribe(char* topic, uint8_t qos) {
   if (qos < 0 || qos > 1)
     return false;

   if (connected()) {
      // Leave room in the buffer for header and variable length field
      uint16_t length = 5;
      nextMsgId++;
      if (nextMsgId == 0) {
         nextMsgId = 1;
      }
      buffer[length++] = (nextMsgId >> 8);
      buffer[length++] = (nextMsgId & 0xFF);
      length = writeString(topic, buffer,length);
      buffer[length++] = qos;
      return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
   }
   return false;
}

bool PubSubClient::puback(uint16_t msgId) {
  if(connected()) {
    // Leave room in the buffer for header and variable length field
    uint16_t length = 5;
    buffer[length++] = (msgId >> 8);
    buffer[length++] = (msgId & 0xFF);
    return write(MQTTPUBACK,buffer,length-5);
  }
  return false;
}

// HELPERS

//bool PubSubClient::unsubscribe(char* topic) {
bool PubSubClient::unsubscribe(char* topic) {
   if (connected()) {
      uint16_t length = 5;
      nextMsgId++;
      if (nextMsgId == 0) {
         nextMsgId = 1;
      }
      buffer[length++] = (nextMsgId >> 8);
      buffer[length++] = (nextMsgId & 0xFF);
      length = writeString(topic, buffer,length);
      return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
   }
   return false;
}

void PubSubClient::disconnect() {
   buffer[0] = MQTTDISCONNECT;
   buffer[1] = 0;
   _client->write(buffer,2);
   _client->stop();
   lastInActivity = lastOutActivity = millis();
}

uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
   char* idp = string;
   uint16_t i = 0;
   pos += 2;
   while (*idp) {
      buf[pos++] = *idp++;
      i++;
   }
   buf[pos-i-2] = (i >> 8);
   buf[pos-i-1] = (i & 0xFF);
   return pos;
}


bool PubSubClient::connected() {
   bool rc;
   if (_client == NULL ) {
      rc = false;
   } else {
      rc = (int)_client->connected();
      if (!rc) _client->stop();
   }
   return rc;
}


// MAIN APPLICATOIN CODE STARTS HERE

// Update these with values suitable for your network.
byte ip[] = { 192, 168, 0, 50 };
int LED = D7; // for demo only

void callback(char* topic, byte* payload, unsigned int length) {
  // handle message arrived - we are only subscribing to one topic so assume all are led related
  
    byte ledOn[] = {0x6F, 0x6E}; // hex for on
    byte ledOff[] = {0x6F, 0x66, 0x66}; // hex for off
    byte ledFlash[] ={0x66, 0x6C, 0x61, 0x73, 0x68}; // hex for flash
    
    if (!memcmp(ledOn, payload, sizeof(ledOn)))
        digitalWrite(LED, HIGH);
        
    if (!memcmp(ledOff, payload, sizeof(ledOff)))
        digitalWrite(LED, LOW);
        
    if (!memcmp(ledFlash, payload, sizeof(ledFlash))) {
        for (int flashLoop=0;flashLoop < 3; flashLoop++) {
            digitalWrite(LED, HIGH);
            delay(250);
            digitalWrite(LED, LOW);
            delay(250);
            
        }
    }
}


TCPClient tcpClient;
PubSubClient client(ip, 1883, callback, tcpClient);


// Simple MQTT demo to allow the blue led (D7) to be turned on or off. Send message to topic "led" with payload of "on" or "off"

void setup()
{
    
    pinMode(LED, OUTPUT); // Use for a simple test of the led on or off by subscribing to a topical called led

  //if (client.connect("Spark")) { // Anonymous authentication enabled
  if (client.connect("spark", "userid", "password")) { // uid:pwd based authentication
    client.publish("Spark Status","I'm Alive...");
    client.subscribe("led");
  }
}

void loop()
{
  client.loop();
}

I am sure the callback can be improved on by somebody with more experience, but it works for now. Full credit to Nick for his original MQTT library.

Cheers
Chris

19 Likes

Massive thank you!

I’m building some home automation software (yes, another one), and have been using MQTT to communicate between “things” and the server. It was my goal to use MQTT on the spark.

1 Like

Yes, another massive Thanks for your work.
Looking forward to trying this out…

My Mosquitto broker is out on the net, should this code work as is with a real world IP address?

(Sorry if this is a basic Core question rather than an MQTT question).

Thanks again for the code, no way I could have worked all this out.

Cheers.

Hi TBG - as far as I know there should be no issue connecting to a public broker, you might need to allow incoming port 1883 on your router.

To set the IP address just change the line

byte ip[] = { 192, 168, 0, 50 };

PubSubClient is overloaded and “should” accept either a byte array in dot decimal format as above or alternatively as a char array to pass in a server / domain name. If domain is !null then it will attempt to connect using domain name or drop through to IP address.

if (domain != NULL) {
    result = _client->connect(this->domain, this->port);
} else {
    result = _client->connect(this->ip, this->port);
}

I have also tried this with IBM’s MQTT broker with no issues, and MyMQTT on Android, which is a simple MQTT client that can be set up to send a pub message to the core, or receive on a sub topic.

Hope this helps - let me know if you have any issues.

Txs
Chris

1 Like

Hey. First off, amazing job!
I was wondering though if you’ve had any problems with the Spark Core timing out? After about three hours of publishing, I get a “Client Spark has exceeded timeout, disconnecting.” in the Mosquitto broker window. I don’t know if it matters, but I’m publishing about 75 times a second.

Hi @Shadow6363 - sorry traveling the last couple of days so just getting to his now. Is the core losing wifi during the three hours (flashing green to re-connect) ?

This maybe routed in the underlying issue that exists with the TI wifi chip in use on the core. I haven’t check on the progress the team have been making to address this.

As for the publish rate I don’t think this is a major issue - unless you are blocking long enough for the core to think it has lost connectivity (doesn’t sound like).

@Kitard: That’s the strange thing, when I go and check on it, it’s still “breathing” cyan.

Here’s the relevant code:

// MAIN APPLICATOIN CODE STARTS HERE

#define address 0x1E // Address of HMC5883L

byte ip[] = { 192, 168, 1, 100 };

void callback(char* topic, byte* payload, unsigned int length) {}


TCPClient tcpClient;
PubSubClient client(ip, 1883, callback, tcpClient);

int16_t y;

void setup() {
    Wire.begin();

    Wire.beginTransmission(address);    // Open communication with HMC5883
    Wire.write(0x00);                   // Select Configuration Register A
    Wire.write(0x38);                   // 2 Averaged Samples at 75Hz
    Wire.endTransmission();

    Wire.beginTransmission(address);    // Open communication with HMC5883
    Wire.write(0x01);                   // Select Configuration Register B
    Wire.write(0x00);                   // Set Highest Gain
    Wire.endTransmission();

    Wire.beginTransmission(address);    // Open communication with HMC5883
    Wire.write(0x02);                   // Select Mode Register
    Wire.write(0x00);                   // Continuous Measurement Mode
    Wire.endTransmission();
}

void loop() {
    if(client.loop()) {
        // Tell the HMC5883L where to begin reading data
        Wire.beginTransmission(address);
        Wire.write(0x03); // Select register 3, X MSB Register
        Wire.endTransmission();

        Wire.requestFrom(address, 6);
        if(Wire.available() == 6) {
            Wire.read(); Wire.read(); Wire.read(); Wire.read(); // Ignore X and Z Registers
            y  = Wire.read() << 8; // Y MSB
            y |= Wire.read();      // Y LSB
        }

        char buffer [6];

        sprintf(buffer, "%d", map(y, -1500, -500, -512, 512));

        if(!client.publish("magnetometer", buffer)) {
            client.disconnect();
            client.connect("Spark");
        } else {
            delay(10);
        }
    } else {
        client.disconnect();
        client.connect("Spark");
    }
}

I wasn’t too certain what would happen in an error situation which is why you see a bunch of the disconnect and reconnect blocks hoping one of them would fix the problem, but so far none of them have. I would think as long as the loop is still running and there’s a Wi-Fi connection (given the light, there should be), it would inevitably reconnect at some point.

Anyway, do you happen to know if there’s some way to log the status of the Spark Core itself? Some way to indicate if it’s lost the Wi-Fi connection or has stopped running the main loop?

Thanks for any help and again for your library!

Hey Guys!

Just wanted to pop in and mention there was a firmware release earlier this last week that includes a bunch of improvements for recovering after dropped connections. If you were going to upgrade – make sure you modify some small part of your program (add a space, or something), to force it to re-compile, and then flash.

Thanks!
David

Thanks for the update @Dave; however, it sadly doesn’t seem to have made a difference for me. :confused: Hoping my problem might be due to whatever you guys are working with TI to fix and it’ll resolve itself once that’s been implemented.

1 Like

Thanks @Kitard for adapting this for the Core! I got it up and running pretty easily. But I’m stuck trying to figure out how to use the callback in a way that makes sense.

Converting all of the possible payloads to hex to do a memory comparison is really unappealing. Is there a cleaner method? My C++ is awful and I’m just cobbling things together. Is there a way to convert that byte* payload into something closer to a string or char for comparison?

/* pseudo code */

if (topic == "control/power") {
    if (payload == "on") {
        powerOn();
    } else if (payload == "off") {
        powerOff();
    }
}

Hey @Kitard thanks for the work. It works all fine for me as long as I am connected to the spark cloud. But I do have a few queries here.

  1. Is there any library available for generating certificates and PSK for authentication as in the mosquitto?
  2. When connected to the spark cloud, I can see the IP assigned to the core in my router’s homepage. But cannot ping that IP from a device in the network ??
  3. Once the code is flashed, I removed the internet uplink from my router and the connection to the cloud is lost. OK I got that but the network is still there and MQTT does not require the cloud to work, so why does it get disconnected from the broker running in the local network.
    Is it like if not connected to the cloud, the core cannot connect to the network at all? What if the spark cloud has access restriction from my network. I want to flash the core over USB and use it in the network. I am not speaking of a local cloud/server or anything as such, just a local network The Mosquitto client or the android MyMQTT client with the Mosquitto broker running locally does not require internet to communicate, right !!. This question is also applicable to all communication over wifi that does not require internet access.
  4. Also can you suggest me a C library for the same (MQTT) as well as for all other functionality in the core. I am a little edgy with C++. With C, I can also try to contribute a bit to this.

Thanks,
Gaurav

1 Like

Hi Chris, nice work you have done, thank you.

I am trying to connect my Core to cloudMQTT.com, but I am having trouble with the authentication.

Looking at your code, the client.connect function requires a uid, username and password.

MQTT doesn’t seem to allow for a uid, or at least I can’t figure out how to use this field properly. I can connect fine through a Ruby script using my cloudMQTT username and password, on a non-SSL port.

Could you perhaps steer me towards the right direction to get authentication to succeed?

Thank you!

Hi @gaurav

Just a comment on the above: the last big patch from TI to the WiFi chip (CC3000) removed the ability to autonomously answer ping requests, so it is not unusual that you cannot ping your core.

1 Like

@futureshocked: uncomment the anonymous authentication and comment the next line.

Cheers!!
Gaurav

Hello,
Please pardon the newbie question. I’m coming from the Arduino world. The code sample that Kitard posted seems to be the entire MQTT library plus the setup and loop routine at the end.

So, if I wanted to use MQTT, does my spark code actually have the entire MQTT library in the code just like Kitard posted, or do I use a #include somehow? Not sure where Spark goes to look for the #include libraries, since it’s being programmed from the cloud.

Thanks, and pardon again for the newbie question.

[edit] It works!

Hello @butters,

You are absolutely right. The code Kitard posted is the Library+Main Application on a single file. It is an adaptation from Nick O’leary’s MQTT library for Arduino where you have a separate library (a .h and a .cpp file) + the main application. Since spark core is using the same wiring cpp wrapper, hence the code is almost the same apart from a few minor changes. You can make separate .h files for declaration and .cpp file for the definitions and the main application if you want to. You have the complete Spark core library available here Spark @Github. Only you might have to modify the makefile to include the added libraries to compile along with your main application.

You can have a taste of it if you can test the MQTT on an Arduino board because the Arduino IDE does it all for you, just add the libraries in the appropriate folder and the IDE with take it from there. For Spark if you are using the cloud based IDE, I think the core libraries are added by default. I am sorry I never used it much, so I am a little short of knowledge on this. Spark elites will be able to enlighten you properly. However I got a new mail from the spark team about the development of a Spark IDE but haven’t had the time to really check it out. Here’s the URL for the same @Spark-Dev.

Cheers !!
Gaurav

Holy crap! It works! Hahaha. Thanks! Sparkcores rock.

Hello,
I've been playing with MQTT some more. I noticed two behaivors, one of which has been a show stopper for me.

  1. If I do Serial.begin(9600) in setup, it prevents MQTT subscribe and publish from working. No idea why. I was using the serial to get Serial.println(), but found out it was interring with MQTT.

  2. This one is a major roadblock for me. I need to subscribe to multiple topics and be able to extract topic names (integers) and messages (floats). This works fine with Arduino and ethernet shield, but the same code has issues with Spark:

void callback(char* topic, byte* payload, unsigned int length) {
int mytopic = atoi (topic);
payload[length] = '\0';
float msg = atof( (const char *) payload);
}

The "mytopic" comes out correct. I'm using integer topic names. But the float "msg" doesn't seem to be working. I can't tell what the atof function is evaluating payload into. Since I can't do Serial.println, it's been difficult to debug. I know that if ((msg > 0) && (msg <100)) evaluates to false even though I published a message that's 50.1, for example.

For those who are wondering, on the Arduino side, the byte* payload looks like this:

client.publish("999", "555");
topic=999
payload=555999

Which is why a character return '\0' is added at location "length(payload)", so that the atof cuts off at the boundary between 5's and 9's. The guy who wrote this is pretty smart, so there's probably a reason why payload has the topic appended to the end of it. I just don't know why.

Perhaps there's something different about the Spark implementation? Or perhaps '\0' is not the correct end of line character? Something about endians maybe?

Note, I've also tried assuming payload is really just the message itself, and not append the \0, but that didn't work either.

EDIT: Looking at the example, it looks like payload contains only the actual message. Sorry about the confusion.

First MQTT Libraries in general.

I’ve been using Chris Howard’s library but struggling with reliability the core would crash after around 23 hours. Not knowing if it’s a problem with the library or with the underling Spark Core TCP stack i was just scheduling a core reboot every 12 hours. Also with almost 24 hours in each run it’s a pain to debug even if i knew where to look. But recently I ran into another bug, when subscribing to a topic that has been set with a retain flag the core will crash shortly after when the retained messages are sent to the core.

So I spent a few hours on this last night and have moved over to the hirotakaster MQTT library it’s publicly in the build interface and also over here. https://github.com/hirotakaster/MQTT. It’s also a port of Nicholas O’Leary Arduino library but appears to have more Spark specific changes.

It seems to not have the retained message bug i recently discovered, as far as overall reliability go’s i’ll get back to you on that in a day or two.

Your issues specifically.

  1. Haven’t come across the serial problem.

  2. Not sure why your float code isn’t working i’ve got something similar for longs that works fine.

    payload[length] = ‘\0’;
    char* cstring = (char *) payload;
    long n = atol(cstring);

you could try sending a float from the mosquitto command line, to see if it’s your sending rather than the receiving code and you could also monitor the same topic with another mqtt client to see what your are getting sent.

mosquitto_pub -t 999 -m 50.1
1 Like

Hi,
Thanks for the input. I’ll see if I could get the other library running.

I’m pretty certain my MQTT publishes are coming in right. I’m using command line to publish, and another command window to monitor what I’m publishing. Also, I know my regular Arduino work, so my test method is pretty sound.

I might try your char *(cstring) = (char *) payload. I don’t do that before running atof.

I really think Spark should work on the MQTT library. The platform hardware is so well suited for IoT, but there’s these glaring deficiencies - MQTT support being a big one.