Canalul

Canale în Go
Published on February 23, 2023
channels broadcast pubsub chatGPT

About 8 minutes of reading.


Totul a pornit de la o discuție cu unul dintre oamenii care învață Go împreună cu mine. M-a întrebat despre „separation of concerns”, în sensul în care își dorea ca aplicația lui să folosească canale pentru componentele care trebuie să anunțe alte componente despre acțiuni, de tipul „fire and forget”.

Un user își face cont, venind de pe Google.

După ce scriem în baza de date, scriem într-un canal care este ascultat de o componentă care trimite emailul de bun venit, aceasta îl primește și într-un mod separat de răspunsul pe care-l întoarce serverul, încearcă să trimită emailul. Să zicem că același canal este ascultat într-o altă gorutină care se ocupă cu adusul profilului de pe Google.

Cu alte cuvinte, avem un canal, iar atunci când scriem în el, câteva goroutine trebuie să primească toate același payload.

„Să trecem la scris cod!” am zis, iar colegul meu mi-a arătat următoarea bucată de cod:

package main

import (
	"context"
	"fmt"
	"testing"
	"time"
)

func Rifleman(ctx context.Context, c chan string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Rifleman: I'm done")
			return
		case msg := <-c:
			fmt.Println("Rifleman:", msg)
		}
	}
}

func Tank(ctx context.Context, c chan string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Tank: I'm done")
			return
		case msg := <-c:
			fmt.Println("Tank:", msg)
		}
	}
}

func Artilery(ctx context.Context, c chan string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Artillery: I'm done")
			return
		case msg := <-c:
			fmt.Println("Artillery:", msg)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	c := make(chan string)

	go Rifleman(ctx, c)
	go Tank(ctx, c)
	go Artilery(ctx, c)

	c <- "Fire!"

	cancel()

	time.Sleep(time.Second) // drain messages

}

Zic „de unde ai luat asta?”. Și-mi spune că i-a scris-o chatGPT și că ar trebui să afișeze următorul output:

Artillery: Fire!
Tank: Fire!
Rifleman: Fire!
Tank: I'm done
Rifleman: I'm done
Artillery: I'm done

… bine, cu mențiunea că ordinea se poate schimba, fiind vorba de gorutine.

„Bineînțeles că nu merge”, i-am spus. „Un canal poate fi văzut ca un tub prin care datele pot fi trimise de la o gorutină la alta. Însă, „intuiția” cum că ar fi posibil să scrii într-un canal și să ajungă la mai multe gorutine nu este corectă.

Un canal poate fi citit de o singură gorutină la un moment dat. Acest lucru înseamnă că nu este posibil să trimiți același mesaj către mai multe gorutine folosind același canal. Deci codul de mai sus nu face ceea ce ne așteptăm noi să facă.

Am apreciat faptul că a folosit context-ul (context.Context) ca să poată fi un bun cetățean și să iasă din gorutine când e cazul.

Acum, hai totuși să rezolvăm problema.

O modalitate ar fi să implementăm o funcționalitate de tip pub-sub, dar nu este cazul poveștii de aici.

După un pic de ajutor despre cum ar trebui să arate codul nostru, colegul a produs următorul cod:

package main

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Payload struct {
	Command string
}

type Fire struct {
	mu   sync.Mutex
	at   atomic.Pointer[chan Payload]
	gets int
}

func NewFire() Fire {
	result := Fire{}
	newCh := make(chan Payload)
	result.at.Store(&newCh)
	return result
}

func (f *Fire) Get() <-chan Payload {
	p := f.at.Load()
	if p == nil {
		fmt.Println("FATAL ERROR : channel is not present in atomic pointer")
		return nil
	}

	f.mu.Lock()
	defer f.mu.Unlock()

	f.gets++

	return *p

}

func (f *Fire) Send(payload Payload) {
	ch := f.at.Load()
	if ch == nil {
		fmt.Println("FATAL ERROR : channel is not present in atomic pointer")
		return
	}

	for i := f.gets; i > 0; i-- {
		*ch <- payload
	}

	f.mu.Lock()
	defer f.mu.Unlock()
	f.gets = 0
}

func RifleMan(ctx context.Context, f Fire) {
	for {
		select {
		case payload := <-f.Get():
			fmt.Println("conscript received command : ", payload, payload.Command)
		case <-ctx.Done():
			return
		}
	}
}

func Tank(ctx context.Context, f Fire) {
	for {
		select {
		case payload := <-f.Get():
			fmt.Println("tank received command : ", payload, payload.Command)
		case <-ctx.Done():
			return
		}
	}
}

func Artillery(ctx context.Context, f Fire) {
	for {
		select {
		case payload := <-f.Get():
			fmt.Println("artillery received command : ", payload, payload.Command)
		case <-ctx.Done():
			return
		}
	}
}

func main() {
	fire := NewFire()
	ctx, cancel := context.WithCancel(context.Background())

	go RifleMan(ctx, fire)
	go Tank(ctx, fire)
	go Artillery(ctx, fire)

	<-time.After(1 * time.Second)

	fire.Send(Payload{Command: "fire at will!"})

	<-time.After(2 * time.Second)

	fire.Send(Payload{Command: "fire again"})

	cancel()

	<-time.After(2 * time.Second) // drain all messages if necessary
}

Cum l-am ajutat și care a fost planul:

Spre surpiza colegului, output-ul arăta așa:

Asta pentru că atunci când a chemat funcțiile RifleMan, Tank și Artillery, le-a chemat cu struct-ul creat, nu cu pointer către respectivul struct. O greșeală comună pentru începători în Go, pentru că lucrând cu valoarea în loc de pointer, obține întotdeauna o copie a structului original. Varianta funcțională, care folosește pointeri, o puteți găsi aici.

Pasul următor (logic, după mine), este să lucrăm cu o interfață, astfel încât să nu permitem funcțiilor ascultătoare să modifice comportamentul structului nostru, întrun fel sau altul.

Iată interfața:

type IFire interface {
    Get() <-chan Payload
}

iar funcțiile RifleMan, Tank și Artillery își vor modifica semnătura, să accepte interfața în loc de pointer.

Nu știu dacă ați observat, dar primul „branch” din selectul ascultătorilor este întotdeauna legat de context. Asta pentru că, tot din motive de concurență, este recomandat ca prima verificare din select să fie despre context Done(). Recomandarea aceasta nu este o regulă bătută în cuie, dar atunci când lucrăm cu „cancellation” sau „timeout” ne ferește de posibile bug-uri de concurență.

Atunci când un canal este închis, orice operațiune de citire pe acel canal va returna imediat o valoare zero a payload-ului, iar orice alte operațiuni cu acel canal vor provoca un „panic”. Asta înseamnă, că dacă context.Done() nu e prima alegere din selectul în cauză și contextul este terminat întrun fel sau altul, celelalte branch-uri vor continua să se execute înainte de ieșirea din gorutină, provocând comportament neașteptat sau chiar „panic”.

Știind acest comportament al canalelor (atunci când sunt închise, ascultătorii primesc valoarea zero), am mai încercat o variantă în care să ne folosim de acest „feature” (cumva neașteptat și neintuitiv) în loc să folosim counter-ul din prima soluție. În afară de faptul că trebuie să facem un canal nou, când precedentul este închis, ascultătorii nu primesc altceva decât valoarea zero a payload-ului, deci nu reprezintă o posibilă soluție. Pentru curioși, experimentul se găsește aici.

Take Aways

Alte probleme care pot apărea (corner case-uri la care să fim atenți):

  1. dacă întruna dintre funcțiile ascultătoare avem de făcut operațiuni heavy-duty, este de preferat să facem acest lucru într-o altă gorutină, pentru că altfel stricăm ciclul „normal” de funcționare. Un exemplu corect de folosire poate fi găsit aici alături de cel care creeză probleme.
  2. este evident pentru mine că acest mod de-a implementa o formă de pub-sub, nu este recomandată în cazul în care unul dintre ascultători trebuie să renunțe la a mai asculta împreună cu ceilalți. E un fel de „toți pentru unul și unul pentru toți”, caz în care, dacă este nevoie ca un participant să nu mai fie în grupul de ascultători, atunci este de preferat să facem un „hard reset”, în sensul în care toți renunță în același timp și numai cei care trebuie să revină la a asculta vor reporni gorutinele. Asta, bineînțeles că poate crea probleme pentru cazul în care producătorul mesajelor (cel care scrie în canal) scrie exact în momentul hard reset-ului.
  3. dacă în funcțiile ascultătoare avem de ascultat mai mult decât cele două canale - cel pentru context.Done() și cel pentru mesajul efectiv, atunci este de preferat să nu folosim această metodă. Gradul de complexitate a problemelor crește foarte mult și este de preferat un pub-sub clasic.

În loc de concluzie

Pentru cazul în care avem o situație care nu se schimbă, adică ascultătorii, odată inițializați nu renunță, acest pattern este unul mai bun decât pub-sub din punct de vedere al simplității. Bineînțeles că se poate implementa o variantă generică.

Mi se pare demn de reținut că chatGPT nu reprezintă încă o soluție bună în materie de asistență în limbajul Go pentru canale. Eu l-am tot testat, de când a apărut, cu diverse întrebări pe diverse subiecte. Se descurcă destul de bine la optimizări și la a explica cod, însă pe partea de canale în Go este de-a dreptul tont. Aștept ziua când voi putea să explic celor care învață Go canalele cu ajutorul lui.

comments powered by Disqus