Projeto: Servidor de Armazenamento Chave-Valor Distribuído com Consistência Eventual
Objetivo
O objetivo deste projeto é implementar o servidor, ou nó, de Armazenamento Chave-Valor Distribuído, eventualmente consistente. Este nó será capaz de:
- Atender requisições de
PuteGetde clientes via gRPC. - Manter a consistência dos dados com outros nós no cluster através de replicação via MQTT, utilizando Vector Clocks para resolução de conflitos.
O foco será na implementação da lógica central para armazenamento de dados, gerenciamento de consistência e comunicação entre nós, demonstrando uma compreensão dos fundamentos de sistemas distribuídos sem a necessidade de gerenciar a associação ao cluster ou algoritmos de consenso complexos.
Visão Geral da Arquitetura
Cada nó no armazenamento Chave-Valor distribuído opera de forma independente, mas se comunica com outros nós para alcançar a consistência eventual.
- Comunicação Cliente-Nó (gRPC): Clientes interagem com qualquer nó no cluster usando gRPC para realizar operações de
Put(escrita) eGet(leitura). - Comunicação Nó-Nó (MQTT): Os nós usam um broker MQTT para difundir atualizações (replicar dados) para todos os outros nós inscritos. Isso permite uma propagação de dados assíncrona e desacoplada.
Diagrama Conceitual:
- Servidores se subscrevem para receber atualizações.
- Clientes fazem requisições a servidores individuais.
- Atualizações são publicadas e entregues a todos os servidores pelo Broker.
Estruturas de Dados
O armazenamento chave-valor irá guardar dados de forma que permita que múltiplas versões concorrentes do valor de uma chave e seu histórico causal sejam rastreados.
-
VectorClock:- Tipo: Um mapa/dicionário onde as chaves são
NodeID(string) e os valores sãoCounter(inteiro de 64 bits sem sinal). - Propósito: Rastreia o histórico causal de um dado. Cada entrada
(NodeID, Counter)indica que esta versão do dado viu todas as atualizações deNodeIDaté oCounterespecificado.
- Tipo: Um mapa/dicionário onde as chaves são
-
Version:- Tipo: Uma estrutura/objeto que representa uma única versão de um valor para uma dada chave.
- Campos:
value: String (o dado real armazenado).vector_clock:VectorClock(o vector clock associado a esta versão específica).timestamp: Inteiro de 64 bits sem sinal (nanosegundos desde a Época Unix quando esta versão foi criada). Usado como desempate para versões concorrentes.writer_node_id: String (oNodeIDdo nó que originalmente escreveu esta versão).
-
StoreEntry:- Tipo: Uma estrutura/objeto que representa todas as versões ativas para uma chave específica.
- Campos:
key: String (a chave para a qual os valores estão armazenados).versions: Lista/Array de objetosVersion(contém todas as versões atualmente ativas e não substituídas para esta chave).
-
NodeState:- Tipo: O objeto de estado central para um único nó.
- Campos:
node_id: String (Um identificador único para este nó).store: Um mapa/dicionário concorrente que mapeia chaves para seus objetosStoreEntry.
Algoritmos
O núcleo do modelo de consistência reside em como os VectorClocks são gerenciados e como as operações de Put e Get interagem com eles.
-
merge_vector_clocks(vc1, vc2):- Entrada: Dois
VectorClocks,vc1evc2. - Saída: Um novo
VectorClock.- Para cada
NodeIDpresente emvc1ouvc2, o clock mesclado assume o valor máximo do contador. Se umNodeIDestiver presente apenas em um clock, seu contador é assumido diretamente.
- Para cada
- Exemplo:
vc1 = {A:1, B:2}vc2 = {A:2, C:1}merged = {A:2, B:2, C:1}
- Entrada: Dois
-
compare_vector_clocks(vc1, vc2):- Entrada: Dois
VectorClocks,vc1evc2. - Saída:
truesevc1aconteceu antes devc2(ou seja,vc1é estritamente menor ou igual avc2em todos os componentes, e estritamente menor em pelo menos um, ouvc2tem componentes que não estão emvc1).falsesevc2aconteceu antes devc1(simétrico atrue).null/Nonesevc1evc2forem concorrentes (nenhum aconteceu antes do outro).
- Entrada: Dois
-
process_put(key, new_version_value, is_replication_source):- Entrada:
key(string),new_version_value(string),is_replication_source(booleano -truese este PUT for de uma mensagem de replicação MQTT,falsese for de um cliente gRPC). - Saída:
void(ou erro indicando falha). -
Lógica: a. Criar nova versão para inclusão: * Se
!is_replication_source(ou seja, PUT iniciado pelo cliente): * adicione os valores adequados paratimestampe owriter_node_id. * atualize o Vector Clock para a chave, fazendo merge com o valor anterior, caso necessário. * Caso contrário (is_replication_source): * O objetonew_versionrecebido já conterá seutimestamp,writer_node_idevector_clock. Use-os diretamente.b. Resolução de Conflitos / Gerenciamento de Versões: * A nova versão obtida deverá substituir qualquer versão precedente no
StoreEntrydostore. * Caso a nova versão seja concorrente com alguma versão nostore, ambas deverão ser mantidas noStoreEntry. * Compare os vector clocks para saber se há ou não concorrência entre as versões. * Não deve haver versões repetidas para a mesma chave noStoreEntry.c. Replicar (se não for fonte de replicação): * Se
!is_replication_source(ou seja, este PUT originou-se de um cliente neste nó): * Serialize oStoreEntryatualizado em uma string JSON. * Publique esta string JSON no tópico MQTTkvstore/replicationcom QoS 1 (AtLeastOnce). Isso garante que outros nós recebam a atualização. * Esta publicação deve ocorrer de forma assíncrona (ex: em uma thread/goroutine/tarefa separada) para evitar bloquear a resposta gRPC.
- Entrada:
-
process_get(key):- Entrada:
key(string). - Saída: Lista/Array de objetos
Version(ou lista vazia se a chave não for encontrada). - Lógica:
a. Recupere o
StoreEntryparakey. Se não for encontrado, retorne uma lista vazia. b. Retorne apenas os objetosVersions ativos, ou seja, versões concorrentes ou versões que não precedem nenhum outra versão ativa.
- Entrada:
Protocolos de Comunicação
-
API gRPC (
kv_store.proto): Deve-se usar, obrigatoriamente, a seguinte definição.protopara gerar o código do cliente (previamente fornecido) e do servidor para a linguagem escolhida.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
syntax = "proto3"; package kvstore; // O principal serviço de armazenamento chave-valor service KvStore { // Armazena um par chave-valor. Aciona a replicação para outros nós. rpc Put (PutRequest) returns (PutResponse); // Recupera todas as versões ativas para uma dada chave. rpc Get (GetRequest) returns (GetResponse); } // --- Estruturas de Dados --- // Representa uma única entrada em um Vector Clock. message VectorClockEntry { string node_id = 1; uint64 counter = 2; } // Um Vector Clock, representado como uma lista de entradas. message VectorClock { repeated VectorClockEntry entries = 1; } // Representa uma versão específica de um valor para uma chave. message Version { string value = 1; // O vector clock associado a esta versão. Opcional para flexibilidade. VectorClock vector_clock = 2; uint64 timestamp = 3; // Nanosegundos desde a época Unix. string writer_node_id = 4; // O ID do nó que originalmente escreveu esta versão. } // --- Mensagens de Requisição --- // Requisição para a operação Put. message PutRequest { string key = 1; string value = 2; } // Requisição para a operação Get. message GetRequest { string key = 1; } // --- Mensagens de Resposta --- // Resposta para a operação Put. message PutResponse { bool success = 1; string error_message = 2; } // Resposta para a operação Get. message GetResponse { repeated Version versions = 1; // Todas as versões ativas encontradas para a chave. string error_message = 2; } -
API MQTT:
- Broker: Qualquer broker MQTT padrão (ex: Mosquitto) pode ser usado.
- Tópico:
kvstore/replication - Payload: Quando um nó origina uma requisição
Put(ou seja,is_replication_sourceéfalseemprocess_put), ele deve serializar oStoreEntrycompleto (não apenas aVersion) para JSON e publicá-lo neste tópico. - Qualidade de Serviço (QoS): QoS 1 (AtLeastOnce) deve ser usado para publicação e assinatura para garantir que as mensagens sejam entregues.
Detalhes da Implementação do Servidor
-
Argumentos de Linha de Comando: O servidor deve aceitar os seguintes argumentos de linha de comando:
--node-id <ID>: Um identificador de string único para este nó (ex:node_A,node_B). Se não for fornecido, gere um UUID.--listen-addr <ADDRESS>: O endereço IP e a porta onde o servidor gRPC irá escutar (ex:127.0.0.1:50051).--mqtt-broker-addr <ADDRESS>: O endereço IP do broker MQTT. Se não for fornecido, utilize127.0.0.1.--mqtt-broker-port <PORT>: A porta do broker MQTT. Se não for fornecida, utilize1883.
-
Gerenciamento de Concorrência:
- O
storedoNodeStatedeve ser thread-safe. Use primitivas de sincronização apropriadas para sua linguagem (ex:Mutexem Rust,sync.Mutex/sync.RWMutexem Go,threading.Lockpara um dicionário em Python,ConcurrentHashMapem Java). - A publicação MQTT deve ser geralmente não bloqueante ou assíncrona da thread de tratamento de requisições gRPC.
- O
-
Fluxo Lógico Principal do Servidor:
- Analisar os argumentos de linha de comando.
- Inicializar a conexão do cliente MQTT com o broker especificado.
- Criar uma instância de
NodeState. - Disparar uma Tarefa/Thread de Escuta MQTT: Esta unidade concorrente separada irá:
- Assinar o tópico
kvstore/replication. - Continuamente buscar por mensagens MQTT recebidas.
- Quando uma mensagem no
kvstore/replicationfor recebida:- Desserializar o payload JSON de volta para um objeto
StoreEntry. - Para cada
Versiondentro doStoreEntrydesserializado, chamarprocess_put(key, version, true)(importante:is_replication_sourcedeve sertrue).
- Desserializar o payload JSON de volta para um objeto
- Assinar o tópico
- Iniciar o Servidor gRPC: Isso exporá os métodos
PuteGetaos clientes.- O método gRPC
Putchamaráprocess_put(key, value, false). - O método gRPC
Getchamaráprocess_get(key).
- O método gRPC
Requisitos do projeto
- Implementar Estruturas de Dados Essenciais: Traduza
VectorClock,VersioneStoreEntrypara classes/estruturas na linguagem escolhida. - Implementar Algoritmos de Vector Clock: Implemente
merge_vector_clocksecompare_vector_clockscorretamente. - Implementar
process_put: Esta é a parte mais crítica para a consistência. Garanta que ela lide com atualizações de vector clock, resolução de conflitos e replicação MQTT condicional. - Implementar
process_get: Garanta que ela retorne apenas versões ativas com base nas comparações de vector clock. - Implementação do Servidor gRPC:
- Use o arquivo
.protofornecido para gerar o código necessário. - Implemente o serviço
KvStorecom os RPCsPuteGet. - Integre com seu
NodeStatepara chamarprocess_puteprocess_get.
- Use o arquivo
- Integração do Cliente MQTT:
- Use uma biblioteca de cliente MQTT adequada para sua linguagem.
- Configure uma tarefa/thread em segundo plano para assinar
kvstore/replication. - Implemente a lógica para desserializar e processar mensagens de replicação recebidas.
- Implemente a lógica para serializar e publicar objetos
StoreEntryapós operaçõesPutiniciadas pelo cliente.
- Análise de Argumentos de Linha de Comando: Implemente a análise de argumentos para
node-id,listen-addr,mqtt-broker-addremqtt-broker-port.
Sugestões de bibliotecas por linguagem
- Java:
- gRPC: Use
grpc-java. - MQTT: Use Eclipse Paho MQTT Client.
- Concorrência:
java.util.concurrent.ConcurrentHashMapparastore, blocossynchronizedouReentrantLockpara travamento mais granular se necessário,ExecutorServicepara tarefas em segundo plano. - JSON: Jackson ou Gson para serialização/desserialização de
StoreEntry.
- gRPC: Use
- Go:
- gRPC: Use
google.golang.org/grpc. - MQTT: Use
github.com/eclipse/paho.mqtt.golang. - Concorrência:
sync.Mutexousync.RWMutexparastore,goroutines para tarefas concorrentes. - JSON: Pacote
encoding/json.
- gRPC: Use
- Python:
- gRPC: Use
grpcio. - MQTT: Use
paho-mqtt. - Concorrência:
threading.Lockparastore(umdict),threading.Threadouasynciopara tarefas concorrentes. - JSON: Módulo
json.
- gRPC: Use
- Rust:
- gRPC: Use
tonic. - MQTT: Use
rumqttc. - Concorrência:
tokio::sync::Muteparastore,tokio::spawnpara tarefas assíncronas. - JSON:
serdecomserde_json.
- gRPC: Use
Teste e Verificação
- Configuração: É necessário um broker MQTT em execução (ex: Mosquitto).
- Funcionalidade Básica:
- Inicie vários nós (ex: Nó A na 50051, Nó B na 50052, Nó C na 50053).
- Use um cliente gRPC para realizar um
Putde um par chave-valor no Nó A. - Verifique se as requisições
Getpara o Nó A retornam o valor correto. - Após um pequeno atraso (para replicação), as requisições
Getpara o Nó B e o Nó C também devem retornar o valor.
- Resolução de Conflitos:
- Realize um
Putdekey="teste"comvalue="primeiro"no Nó A. - Quase simultaneamente, realize um
Putdekey="teste"comvalue="segundo"no Nó B (antes que o Nó B receba a replicação do Nó A). - Após a replicação se estabilizar em todos os nós, uma requisição
Getparakey="teste"de qualquer nó deve retornar ambos "primeiro" e "segundo" como versões ativas, demonstrando concorrência.
- Realize um
- Substituição (Supersedence):
- Realize um
Putdekey="exemplo"comvalue="original"no Nó A. - Espere pela replicação para todos os nós.
- Realize um
Putdekey="exemplo"comvalue="atualizado"no Nó B. (O Nó B agora tem a versão original e a atualiza). - Após a replicação se estabilizar, as requisições
Getparakey="exemplo"de qualquer nó devem retornar apenas "atualizado", pois "original" deve ter sido substituída.
- Realize um
Detalhamento da Entrega
- Todos os arquivos necessários para execução do servidor devem ser colocados em um repositório privado no
github.comcom acesso liberado para meu usuário (paulo-coelho). - Os trabalhos devem ser feitos, obrigatoriamente, em grupos de 4 alunos. Não serão aceitos trabalhos individuais, ou seja, será atribuída nota zero.
- O repositório deve conter, além do código-fonte, um arquivo
README.mddetalhando:- O que foi e o que não foi implementado.
- Como compilar o código.
- Como executar cada servidor.
- Principais dificuldades encontradas.
- Detalhamento das estruturas de dados que armazenam as chaves/valores/versões.
- Implementações que fizerem alterações no arquivo
.protoou que não funcionarem com o cliente fornecido não serão corrigidas e receberão nota zero. - O repositório deve conter, na pasta raiz, um script para compilação denominado
compile.she um script para executar o servidor com os parâmetros necessários, denominadoserver.sh. - Apenas as linguagens listadas a seguir serão aceitas. Trabalhos em outras linguagens não serão corrigidos e receberão nota zero.
- Go
- Java
- Python
- Rust
Este projeto oferece uma base sólida para a compreensão de aspectos críticos de sistemas distribuídos em um ambiente prático. Bom trabalho a todos!