messaging temps réel avec go
Embed Size (px)
TRANSCRIPT

Messaging temps réel avec GoMickaël Rémond
ProcessOne

Retour d'expérience
Construction d'objets connectés en Go

Introduction: Pourquoi utiliser Go pour lemessaging ?

Pertinence
Go est pertinent pour le messaging temps réel:
sur le client: objets connectés
sur le serveur: plate-forme de messaging

Pour les clients / objets connectés
Channels: Le passage de messages est au coeur du language Go.
Cross compilation: Il est possible de compiler et déployer le code client sur unearchitecture différente.
Accès système: La création d'objet est simplifiée par les capacités systèmes de Go.

Pour les serveurs
Performant sur deux axes:
Go routines: Connections simultanés
Les serveurs de messagerie gèrent de nombreux processus en parallèle
Performance: Débit
Permet de développer des systèmes ayant un fort débit en terme de nombre demessages par seconde.
Latence:
Permet de réduire fortement la latence dans la transmission des messages.

Exemple: NATS
gnatsd, implementation du protocole Nats.io

NATS: Capacité de traitement
Source: Brave New Geek

NATS: Latence
Source: Brave New Geek

Illustration du messaging avec divers protocoles
À connaître:
XMPP: eXtensible Messaging and Presence Protocol
xmpp.org (http://xmpp.org)
MQTT: Message Queuing Telemetry Transport
mqtt.org (http://mqtt.org)
NATS: Protocole et serveur
nats.io (http://nats.io)
Cette présentation se concentre sur l'utilisation des serveurs de messaging depuis unclient Go.
La mise en œuvre de serveurs de messaging en pur Go viendra dans une autreprésentation.

Code

XMPP
XMPP est idéal pour le contrôle des objets.
Le protocole est extensible et propose des spécifications adaptées à l'écriture deséquences d'interaction pour le contrôle:
Découverte (XEP-0347: Internet of Things - Discovery)
Contrôle: Requête / Réponse (XEP-0325: Internet of Things - Control)

Développement d'un Jukebox XMPP en Go
Technologies utilisées:
Protocole XMPP
ejabberd: serveur XMPP
Bibliothèque XMPP en Go, Gox: écriture du Jukebox
Raspberry Pi 2 avec image Linux custom et haut parleur connecté à la sortie sonanalogique
Connexion à SoundCloud

Architecture

Contrôle XMPP du jukebox: Chat bot
Le jukebox joue les liens Soundcloud envoyés dans le chat:
<message type="chat" to="[email protected]" id="aac9a"> <body>https://soundcloud.com/radiohead/spectre</body> </message>
Il stoppe la musique en cours de lecture avec la commande stop:
<message type="chat" to="[email protected]" id="aacaa"> <body>stop</body> </message>

Contrôle du jukebox: IoT XMPP Control
Notre jukebox peut interpréter les requêtes de commande IoT (XEP-0325):
<iq type='set' to='[email protected]/jukebox' id='2'> <set xmlns='urn:xmpp:iot:control' xml:lang='en'> <string name='action' value='play'/> <string name='url' value='https://soundcloud.com/radiohead/spectre'/> </set> </iq>

Code du jukebox: Main receive loop
Déclaration de la connection XMPP:
var client *xmpp.Client var err error if client, err = connectXmpp(*jid, *password, *address); err != nil { log.Fatal("Could not connect to XMPP: ", err) }
Boucle de traitement des paquets XMPP:
for packet := range client.Recv() { switch packet := packet.(type) { case *xmpp.ClientMessage: processMessage(client, p, packet) case *xmpp.ClientIQ: processIq(client, p, packet) case *xmpp.ClientPresence: // Do nothing with received presence default: fmt.Printf("Ignoring packet: %T\n", packet) } }

Code du jukebox: Connexion XMPP
func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) {
xmppOptions := xmpp.Options{Address: address,
Jid: jid, Password: password, PacketLogger: os.Stdout,
Retry: 10}
if client, err = xmpp.NewClient(xmppOptions); err != nil {
return
}
if _, err = client.Connect(); err != nil {
return
}
return
}

Traitement des messsages
func processMessage(client *xmpp.Client, p *mpg123.Player, packet *xmpp.ClientMessage) { command := strings.Trim(packet.Body, " ") if command == "stop" { p.Stop() } else { playSCURL(p, command) } }

Traitement des commandes
func processIq(client *xmpp.Client, p *mpg123.Player, packet *xmpp.ClientIQ) {
switch payload := packet.Payload.(type) {
// We support IOT Control IQ
case *iot.ControlSet:
var url string
for _, element := range payload.Fields {
if element.XMLName.Local == "string" && element.Name == "url" {
url = strings.Trim(element.Value, " ")
break
}
}
playSCURL(p, url)
setResponse := new(iot.ControlSetResponse)
reply := xmpp.ClientIQ{Packet: xmpp.Packet{To: packet.From, Type: "result", Id: packet.Id}, Payload: setResponse}
client.Send(reply.XMPPFormat())
default:
fmt.Printf("Other IQ Payload: %T\n", packet.Payload)
}
}

Jouer le morceau SoundCloud
func playSCURL(p *mpg123.Player, rawURL string) { songID, _ := soundcloud.GetSongID(rawURL) url := soundcloud.FormatStreamURL(songID) p.Play(url) }

MQTT
MQTT est idéal pour remonter l'information venant de capteurs.

Reporting d'un capteur de température
Technologies utilisées:
Capteur: Températeur CPU OSX
sysctl -n machdep.xcpm.cpu_thermal_level
Serveur MQTT de test: Mosquitto
Bibliothèque MQTT en Go: ProcessOne MQTT

Publisher: Mise en place de la connexion
func main() { client := mqtt.New("localhost:1883", nil) client.ClientID = "mremond-osx" if err := client.Connect(); err != nil { log.Fatal("Connection error: ", err) } ticker := time.NewTicker(5 * time.Second) stop := make(chan bool) go publishLoop(client, ticker, stop) runtime.Goexit() }

Publisher: La boucle de publication
func publishLoop(client *mqtt.Client, ticker *time.Ticker, stop <-chan bool) { for done := false; !done; { select { case <-ticker.C: payload := make([]byte, 1, 1) payload[0] = getTemp() client.Publish(getTopic(client.ClientID), payload) case <-stop: done = true break } } }

Publisher: La lecture de la température
func getTemp() byte {
out, err := exec.Command("sysctl", "-n", "machdep.xcpm.cpu_thermal_level").Output()
if err != nil {
log.Println("Cannot read CPU temperature: ", err)
return byte(0)
}
s := string(out)
if temp, err := strconv.ParseInt(strings.Trim(s, "\n"), 10, 32); err != nil {
return byte(temp)
}
return byte(0)
}

Subscriber
func main() {
messages := make(chan *mqtt.Message)
client := mqtt.New("localhost:1883", messages)
client.ClientID = "MQTT-Sub"
if err := client.Connect(); err != nil {
fmt.Printf("Connection error: %q\n", err)
return
}
name := "mremond-osx/cputemp"
topic := packet.Topic{Name: name, QOS: 1}
client.Subscribe(topic)
for m := range messages {
fmt.Printf("Received message on topic %s: %+v\n", m.Topic, m.Payload)
}
}

Prochaines étapes
Publication du code client MQTT:
Publication pour la fin de semaine, après nettoyage et documentation, sur le
compte Github de ProcessOne (https://github.com/processone/)
Présentation de ce que j'ai appris lors du design de l'API de la bibliothèque ?
Mise en oeuvre de serveurs de messaging en Go:
Présentation du serveur NATS ?: Illustration de NATS qui est un protocol non
standard mais également très performant.
Autres serveurs de messaging en Go.

Liens
Code des slides
github.com/processone/talks/tree/master/2016/go-paris-meetup(https://github.com/processone/talks/tree/master/2016/go-paris-meetup)
Go XMPP
github.com/processone/gox (https://github.com/processone/gox)
Go MQTT
github.com/processone/mqtt (https://github.com/processone/mqtt)
Brave New Geek: Dissecting Message Queues
bravenewgeek.com/dissecting-message-queues/ (http://bravenewgeek.com/dissecting-message-queues/)

Thank you
Mickaël Rémond
ProcessOne
[email protected] (mailto:[email protected])
http://www.process-one.net/ (http://www.process-one.net/)
@mickael (http://twitter.com/mickael)
