Go: Programação Distribuída

De Aulas
Revisão de 21h15min de 4 de novembro de 2021 por Admin (discussão | contribs) (→‎Cliente)

TCP Cliente

Efetua uma conexão TCP com um servidor. Aguarda o usuário digitar mensagens e dar ENTER. Para saír, basta digitar EXIT.

 1package main
 2
 3import (
 4	"bufio"
 5	"fmt"
 6	"net"
 7	"os"
 8	"strings"
 9)
10
11func main() {
12	arguments := os.Args     // Pega os argumentos da linha de comando
13	if len(arguments) == 1 { // Se não tiver argumentos retorna erro
14		fmt.Println("Enter with arguments host:port.")
15		return
16	}
17
18	// Usa os argumentos e se conecta ao servidor host:port
19	c, err := net.Dial("tcp", arguments[1])
20	if err != nil {
21		fmt.Println(err)
22		return
23	}
24	defer c.Close()
25	fmt.Print("Digit your message and press key ENTER to send.")
26	fmt.Println("To exit, digit EXIT and press key emter.")
27	for {
28		reader := bufio.NewReader(os.Stdin) // Prepara o buffer de leitura
29		fmt.Print("MSG: ")
30		text, _ := reader.ReadString('\n') // Le um texto do teclado
31		fmt.Fprintf(c, text+"\n")          // Envia o texto pela conexão
32
33		message, _ := bufio.NewReader(c).ReadString('\n') // Aguarda resposta do servidor
34		fmt.Print("RCV: " + message)
35		// Se a resposta for EXIT, fecha a conexão e o cliente
36		if strings.ToUpper(strings.TrimSpace(string(text))) == "EXIT" {
37			fmt.Println("TCP client exiting...")
38			return
39		}
40	}
41}

TCP Servidor Simples

Inicia o servidor e fica aguardando um cliente. Recebe as mensagens do cliente. Quando o cliente envia EXIT, fecha o servidor e a conexão.

 1package main
 2
 3import (
 4	"bufio"
 5	"fmt"
 6	"net"
 7	"os"
 8	"strings"
 9	"time"
10)
11
12func main() {
13	arguments := os.Args     // Pega a porta como argumento da linha de comando
14	if len(arguments) == 1 { // Se não for passado a porta dá erro
15		fmt.Println("Enter with port number in argument")
16		return
17	}
18	l, err := net.Listen("tcp", ":"+arguments[1]) // Inicia a conexão TCP na porta
19	if err != nil {
20		fmt.Println(err)
21		return
22	}
23	defer l.Close() // Ao final, fecha a conexão.
24	fmt.Println("TCP Server initialized at port", arguments[1])
25
26	c, err := l.Accept() // Fica aguardando um cliente se conectar
27	if err != nil {
28		fmt.Println(err)
29		return
30	}
31	fmt.Println("Client connected...")
32
33	for { // Laço eterno
34		// Recebe informações no buffer de leitura
35		netData, err := bufio.NewReader(c).ReadString('\n')
36		if err != nil {
37			fmt.Println(err)
38			return
39		}
40		// Se for EXIT, fecha o sevidor, mas fecha a conexão antes
41		if strings.ToUpper(strings.TrimSpace(string(netData))) == "EXIT" {
42			fmt.Println("TCP Server terminated...")
43			return
44		}
45
46		// Mostra a mensagem na tela e envia de volta o horário
47		fmt.Print(": ", string(netData))
48		t := time.Now()
49		myTime := t.Format(time.RFC3339) + "\n"
50		c.Write([]byte(myTime)) // Escreve no buffer de escrita para o cliente
51	}
52}

TCP Servidor com múltiplas conexões

Observe que agora temos um servidor diferente. No laço for do main ficamos aguardando um cliente se conectar. Quando houver uma conexão, essa conexão é transferida para uma função child que é chamada concorrentemente como gorotine e continua o laço, aguardando uma nova conexão.

Cada gorotine, ora chamada child, vai ficar responsável por gerenciar uma conexão específica. Quando o cliente mandar a palavra exit, não fecha o servidor, apenas aquela conexão.

Dessa forma, nosso servidor pode trabalhar com múltiplas conexões simultâneas, respondendo concorrentemente há vários clientes.

Veja que no código, quando um cliente manda uma mensagem, escrevemos qual o endereço da conexão para identificar a mensagem e diferenciá-la das mensagens dos outros clientes.

 1package main
 2
 3import (
 4	"bufio"
 5	"fmt"
 6	"net"
 7	"os"
 8	"strings"
 9	"time"
10)
11
12func child(conn net.Conn) {
13	addr := conn.RemoteAddr() // Pega o endereço do cliente
14	fmt.Println(addr, ": connected...")
15	defer fmt.Println(addr, ": disconnected...")
16	defer conn.Close() // Ao final, fecha a conexão
17
18	for {
19		// Recebe informações no buffer de leitura
20		netData, err := bufio.NewReader(conn).ReadString('\n')
21		if err != nil {
22			fmt.Println(err)
23			return
24		}
25		// Se for EXIT, fecha fecha a conexão
26		if strings.ToUpper(strings.TrimSpace(string(netData))) == "EXIT" {
27			fmt.Println(addr, ": exit...")
28			return
29		}
30		// Mostra a mensagem na tela e envia de volta o horário
31		fmt.Print(addr, " : ", string(netData))
32		t := time.Now()
33		myTime := t.Format(time.RFC3339) + "\n"
34		conn.Write([]byte(myTime)) // Escreve no buffer de escrita para o cliente
35	}
36}
37
38func main() {
39	arguments := os.Args     // Pega a porta como argumento da linha de comando
40	if len(arguments) == 1 { // Se não for passado a porta dá erro
41		fmt.Println("Enter with port number in argument")
42		return
43	}
44	l, err := net.Listen("tcp", ":"+arguments[1]) // Configura uma porta TCP
45	if err != nil {
46		fmt.Println(err)
47		return
48	}
49	defer l.Close() // Ao final, fecha a conexão.
50	fmt.Println("TCP Server initialized at port", arguments[1])
51
52	for {
53		c, err := l.Accept() // Aguarda o próximo cliente se conectar
54		if err != nil {
55			fmt.Println(err)
56			return
57		}
58		go child(c) // Passa a conexão para uma gorotine gerenciar
59	}
60}

Daria pra fazer um chat de grupo com isso, não? É um bom exercício.

UDP Cliente

Veja que a conexão com um servidor UPD é diferente, então o cliente UDP também precisa ser específico pra isso. Por isso, não tente usar um cliente TCP para se conectar a um servidor UDP, ou melhor, tente pra ver o que acontece (¬‿¬)

Veja que aqui temos uma troca de mensagens via buffer de bytes.

 1package main
 2
 3import (
 4	"bufio"
 5	"fmt"
 6	"net"
 7	"os"
 8	"strings"
 9)
10
11func main() {
12	arguments := os.Args
13	if len(arguments) == 1 {
14		fmt.Println("Please provide a host:port string")
15		return
16	}
17
18	s, err := net.ResolveUDPAddr("udp4", arguments[1])
19	c, err := net.DialUDP("udp4", nil, s)
20	if err != nil {
21		fmt.Println(err)
22		return
23	}
24	fmt.Println("The UDP server is", c.RemoteAddr().String())
25	defer c.Close()
26
27	for {
28		reader := bufio.NewReader(os.Stdin)
29		fmt.Print(">> ")
30		text, _ := reader.ReadString('\n')
31		data := []byte(text + "\n")
32		_, err = c.Write(data)
33		if strings.ToUpper(strings.TrimSpace(string(data))) == "EXIT" {
34			fmt.Println("Exiting UDP client!")
35			return
36		}
37		if err != nil {
38			fmt.Println(err)
39			return
40		}
41		buffer := make([]byte, 1024)
42		n, _, err := c.ReadFromUDP(buffer)
43		if err != nil {
44			fmt.Println(err)
45			return
46		}
47		fmt.Println("Reply:", string(buffer[0:n]))
48	}
49}

UDP Servidor

E então, nosso servidor UDP. É muito parecido com o TCP, mas configurado de forma diferente e agora trabalha com mensagens na forma de buffer de bytes.

 1package main
 2
 3import (
 4	"fmt"
 5	"math/rand"
 6	"net"
 7	"os"
 8	"strconv"
 9	"strings"
10	"time"
11)
12
13func main() {
14	arguments := os.Args
15	if len(arguments) == 1 {
16		fmt.Println("Please provide a port number!")
17		return
18	}
19	s, err := net.ResolveUDPAddr("udp4", ":"+arguments[1])
20	if err != nil {
21		fmt.Println(err)
22		return
23	}
24	conn, err := net.ListenUDP("udp4", s)
25	if err != nil {
26		fmt.Println(err)
27		return
28	}
29	defer conn.Close()
30	buffer := make([]byte, 1024)
31	rand.Seed(time.Now().Unix())
32
33	for {
34		/*
35			Veja que estamos trabalhando com leitura em buffer, em bytes.
36			A função ReadFromUDP retorna também o tamanho da mensagem que
37			estamos recebendo no buffer e transformamos em string
38		*/
39		n, addr, err := conn.ReadFromUDP(buffer)
40		fmt.Print(addr, " -> ", string(buffer[0:n-1]))
41
42		// Se a mensagem for exit, fechamos a conexão e o servidor
43		if strings.ToUpper(strings.TrimSpace(string(buffer[0:n]))) == "EXIT" {
44			fmt.Println("Exiting UDP server!")
45			return
46		}
47
48		data := []byte(strconv.Itoa(rand.Intn(1000)))
49		fmt.Println(addr, "<-", string(data))
50		_, err = conn.WriteToUDP(data, addr)
51		if err != nil {
52			fmt.Println(err)
53			return
54		}
55	}
56}

Exemplo de Computação Distribuída

Vamos usar como exemplo um programa que calcula o fatorial de um número, ou seja:

n! = 1 * 2 * 3 * ... * n

Contudo, vamos dividir o problema em partes e mandar para servidores executar esses pedaços de cálculos para por fim calcular o resultado final no programa principal.

Vamos supor que temos o número 15 e queremos dividir em 4 subprocessos. Também temos 3 servidores disponíveis para executar o cálculo. Veja que um dos servidores vai receber dois pacotes para calcular.

Para isso, dividimos os números em 4 blocos

  • de 1 a 4
  • de 5 a 8
  • de 9 a 12
  • de 13 a 15 (lembrando que não podemos passar do valor que queremos calcular)

No programa, vamos enviando cada um pacote de dados (begin, end) para cada servidor. Quando acabarem os servidores disponíveis, começa novamente do primeiro e vai enviando novos pacotes de dados. Nesse caso, só mandou dois pacotes para o primeiro.

Conforme as gorotines vão recebendo o resultado, vão jogando em um canal buffer.

Quando todas as rotinas terminam, o programa pega todos os resultados e multiplicas eles, gerando o resultado final.

Do lado do servidor, ele apenas multiplica cada número, do número inicial até o final, e envia de volta o resultado.

Veja que estamos trabalhando com mensagens JSON aqui.

Servidor

 1package main
 2
 3import (
 4	"bufio"
 5	"encoding/json"
 6	"fmt"
 7	"net"
 8	"os"
 9	"strconv"
10)
11
12type Message struct {
13	Begin int `json:"begin"`
14	End   int `json:"end"`
15}
16
17func child(conn net.Conn) {
18	defer conn.Close()
19	netData, err := bufio.NewReader(conn).ReadString('\n')
20	if err != nil {
21		fmt.Println(err)
22		return
23	}
24	msg := Message{}
25	json.Unmarshal([]byte(netData), &msg)
26	out := 1
27	for i := msg.Begin; i <= msg.End; i++ {
28		out *= i
29	}
30	fmt.Println(msg.Begin, "->", msg.End, ":", out)
31	conn.Write([]byte(strconv.Itoa(out)))
32}
33
34func main() {
35	arguments := os.Args
36	if len(arguments) == 1 {
37		fmt.Println("Enter with port number in argument")
38		return
39	}
40	l, err := net.Listen("tcp", ":"+arguments[1])
41	if err != nil {
42		fmt.Println(err)
43		return
44	}
45	defer l.Close()
46	fmt.Println("TCP Server initialized at port", arguments[1])
47
48	for {
49		c, err := l.Accept()
50		if err != nil {
51			fmt.Println(err)
52			return
53		}
54		go child(c)
55	}
56}

Cliente

 1package main
 2
 3import (
 4	"bufio"
 5	"encoding/json"
 6	"fmt"
 7	"net"
 8	"os"
 9	"strconv"
10	"sync"
11)
12
13type Message struct { // Estrutura da mensagem
14	Begin int `json:"begin"`
15	End   int `json:"end"`
16}
17
18func connect_server(wg *sync.WaitGroup, ch chan int, server string, begin, end int) {
19	defer wg.Done()
20	c, err := net.Dial("tcp", server) // Conecta ao servidor
21	if err != nil {
22		fmt.Println(err)
23		return
24	}
25	defer c.Close()
26	msg := Message{begin, end}                        // cria uma mensagem na estrutura definida
27	msgJson, _ := json.Marshal(msg)                   // transforma em string json
28	fmt.Fprintf(c, string(msgJson)+"\n")              // envia ao servidor
29	message, _ := bufio.NewReader(c).ReadString('\n') // aguarda a resposta
30	result, err := strconv.Atoi(message)              // transforma a resposta para inteiro
31	if err == nil {
32		fmt.Println(begin, "->", end, ":", result)
33		ch <- result // manda o resultado para o programa principal
34	}
35}
36
37/*
38	EXECUÇÃO:
39	go run fat_dist_cli.go [value] [rotines] [server:port] ...
40
41	1: Valor para calcular o fatorial
42	2: Quantidade de gorotines, ou conexões a servidores
43	3 em diante: servidores para se conectar no formato servidor:porta
44
45	EXEMPLO:
46
47	go run fat_dist_cli.go 20 5 localhost:4400 localhost:4402 localhost:4401
48
49	O exemplo acima pede para calcular o fatorial de 20, dividindo em 5
50	subrotinas. Em sequência temos 3 servidores para utilizar. Como o número
51	de servidores é menor do que a quantidade de subrotinas, dois deles vão receber
52	duas requisições de cálculos
53*/
54
55func main() {
56	args := os.Args                        // pega os argumentos
57	value, _ := strconv.Atoi(args[1])      // pega o valor
58	rotines, _ := strconv.Atoi(args[2])    // pega a quantidade de rotinas
59	size := int(value / rotines)           // calcula o tamanho do bloco a separar
60	servers := make([]string, len(args)-3) // cria o vetor de servidores
61	j := 0                                 //
62	for i := 3; i < len(args); i++ {       // coloca os parâmetros como servidores
63		servers[j] = args[i]
64		j++
65	}
66
67	var wg sync.WaitGroup
68	wg.Add(rotines)
69	ch := make(chan int, rotines)
70
71	j = 0
72	end := 0
73	for i := 0; i < rotines; i++ { // cria as subrotinas
74		begin := end + 1   // separa o inicio
75		end = begin + size // e o fim do bloco de cálculo
76		if end > value {   // Não pode passar do valor total
77			end = value
78		}
79		go connect_server(&wg, ch, servers[j], begin, end) // envia para um servidor
80		j++
81		if j >= len(servers) {
82			j = 0
83		}
84	}
85	wg.Wait() // aguarda os servidores terminarem
86	close(ch)
87	// Pega os resultados e multiplica-os
88	result := 1
89	for v := range ch {
90		result *= v
91	}
92	// apresenta o resultado final
93	fmt.Println("Fatorial de", value, "é", result)
94}