Projeto: Servidor de Armazenamento Chave-Valor Distribuído com Consistência Eventual
Objetivo
O objetivo deste projeto é implementar o servidor, ou nó, de Armazenamento Chave-Valor Distribuído, eventualmente consistente. Este nó será capaz de:
- Atender requisições de
Put
eGet
de clientes via gRPC. - Manter a consistência dos dados com outros nós no cluster através de replicação via MQTT, utilizando Vector Clocks para resolução de conflitos.
O foco será na implementação da lógica central para armazenamento de dados, gerenciamento de consistência e comunicação entre nós, demonstrando uma compreensão dos fundamentos de sistemas distribuídos sem a necessidade de gerenciar a associação ao cluster ou algoritmos de consenso complexos.
Visão Geral da Arquitetura
Cada nó no armazenamento Chave-Valor distribuído opera de forma independente, mas se comunica com outros nós para alcançar a consistência eventual.
- Comunicação Cliente-Nó (gRPC): Clientes interagem com qualquer nó no cluster usando gRPC para realizar operações de
Put
(escrita) eGet
(leitura). - Comunicação Nó-Nó (MQTT): Os nós usam um broker MQTT para difundir atualizações (replicar dados) para todos os outros nós inscritos. Isso permite uma propagação de dados assíncrona e desacoplada.
Diagrama Conceitual:
- Servidores se subscrevem para receber atualizações.
- Clientes fazem requisições a servidores individuais.
- Atualizações são publicadas e entregues a todos os servidores pelo Broker.
Estruturas de Dados
O armazenamento chave-valor irá guardar dados de forma que permita que múltiplas versões concorrentes do valor de uma chave e seu histórico causal sejam rastreados.
-
VectorClock
:- Tipo: Um mapa/dicionário onde as chaves são
NodeID
(string) e os valores sãoCounter
(inteiro de 64 bits sem sinal). - Propósito: Rastreia o histórico causal de um dado. Cada entrada
(NodeID, Counter)
indica que esta versão do dado viu todas as atualizações deNodeID
até oCounter
especificado.
- Tipo: Um mapa/dicionário onde as chaves são
-
Version
:- Tipo: Uma estrutura/objeto que representa uma única versão de um valor para uma dada chave.
- Campos:
value
: String (o dado real armazenado).vector_clock
:VectorClock
(o vector clock associado a esta versão específica).timestamp
: Inteiro de 64 bits sem sinal (nanosegundos desde a Época Unix quando esta versão foi criada). Usado como desempate para versões concorrentes.writer_node_id
: String (oNodeID
do nó que originalmente escreveu esta versão).
-
StoreEntry
:- Tipo: Uma estrutura/objeto que representa todas as versões ativas para uma chave específica.
- Campos:
key
: String (a chave para a qual os valores estão armazenados).versions
: Lista/Array de objetosVersion
(contém todas as versões atualmente ativas e não substituídas para esta chave).
-
NodeState
:- Tipo: O objeto de estado central para um único nó.
- Campos:
node_id
: String (Um identificador único para este nó).store
: Um mapa/dicionário concorrente que mapeia chaves para seus objetosStoreEntry
.
Algoritmos
O núcleo do modelo de consistência reside em como os VectorClock
s são gerenciados e como as operações de Put
e Get
interagem com eles.
-
merge_vector_clocks(vc1, vc2)
:- Entrada: Dois
VectorClock
s,vc1
evc2
. - Saída: Um novo
VectorClock
.- Para cada
NodeID
presente emvc1
ouvc2
, o clock mesclado assume o valor máximo do contador. Se umNodeID
estiver presente apenas em um clock, seu contador é assumido diretamente.
- Para cada
- Exemplo:
vc1 = {A:1, B:2}
vc2 = {A:2, C:1}
merged = {A:2, B:2, C:1}
- Entrada: Dois
-
compare_vector_clocks(vc1, vc2)
:- Entrada: Dois
VectorClock
s,vc1
evc2
. - Saída:
true
sevc1
aconteceu antes devc2
(ou seja,vc1
é estritamente menor ou igual avc2
em todos os componentes, e estritamente menor em pelo menos um, ouvc2
tem componentes que não estão emvc1
).false
sevc2
aconteceu antes devc1
(simétrico atrue
).null
/None
sevc1
evc2
forem concorrentes (nenhum aconteceu antes do outro).
- Entrada: Dois
-
process_put(key, new_version_value, is_replication_source)
:- Entrada:
key
(string),new_version_value
(string),is_replication_source
(booleano -true
se este PUT for de uma mensagem de replicação MQTT,false
se for de um cliente gRPC). - Saída:
void
(ou erro indicando falha). -
Lógica: a. Criar nova versão para inclusão: * Se
!is_replication_source
(ou seja, PUT iniciado pelo cliente): * adicione os valores adequados paratimestamp
e owriter_node_id
. * atualize o Vector Clock para a chave, fazendo merge com o valor anterior, caso necessário. * Caso contrário (is_replication_source
): * O objetonew_version
recebido já conterá seutimestamp
,writer_node_id
evector_clock
. Use-os diretamente.b. Resolução de Conflitos / Gerenciamento de Versões: * A nova versão obtida deverá substituir qualquer versão precedente no
StoreEntry
dostore
. * Caso a nova versão seja concorrente com alguma versão nostore
, ambas deverão ser mantidas noStoreEntry
. * Compare os vector clocks para saber se há ou não concorrência entre as versões. * Não deve haver versões repetidas para a mesma chave noStoreEntry
.c. Replicar (se não for fonte de replicação): * Se
!is_replication_source
(ou seja, este PUT originou-se de um cliente neste nó): * Serialize oStoreEntry
atualizado em uma string JSON. * Publique esta string JSON no tópico MQTTkvstore/replication
com QoS 1 (AtLeastOnce). Isso garante que outros nós recebam a atualização. * Esta publicação deve ocorrer de forma assíncrona (ex: em uma thread/goroutine
/tarefa separada) para evitar bloquear a resposta gRPC.
- Entrada:
-
process_get(key)
:- Entrada:
key
(string). - Saída: Lista/Array de objetos
Version
(ou lista vazia se a chave não for encontrada). - Lógica:
a. Recupere o
StoreEntry
parakey
. Se não for encontrado, retorne uma lista vazia. b. Retorne apenas os objetosVersion
s ativos, ou seja, versões concorrentes ou versões que não precedem nenhum outra versão ativa.
- Entrada:
Protocolos de Comunicação
-
API gRPC (
kv_store.proto
): Deve-se usar, obrigatoriamente, a seguinte definição.proto
para gerar o código do cliente (previamente fornecido) e do servidor para a linguagem escolhida.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
syntax = "proto3"; package kvstore; // O principal serviço de armazenamento chave-valor service KvStore { // Armazena um par chave-valor. Aciona a replicação para outros nós. rpc Put (PutRequest) returns (PutResponse); // Recupera todas as versões ativas para uma dada chave. rpc Get (GetRequest) returns (GetResponse); } // --- Estruturas de Dados --- // Representa uma única entrada em um Vector Clock. message VectorClockEntry { string node_id = 1; uint64 counter = 2; } // Um Vector Clock, representado como uma lista de entradas. message VectorClock { repeated VectorClockEntry entries = 1; } // Representa uma versão específica de um valor para uma chave. message Version { string value = 1; // O vector clock associado a esta versão. Opcional para flexibilidade. VectorClock vector_clock = 2; uint64 timestamp = 3; // Nanosegundos desde a época Unix. string writer_node_id = 4; // O ID do nó que originalmente escreveu esta versão. } // --- Mensagens de Requisição --- // Requisição para a operação Put. message PutRequest { string key = 1; string value = 2; } // Requisição para a operação Get. message GetRequest { string key = 1; } // --- Mensagens de Resposta --- // Resposta para a operação Put. message PutResponse { bool success = 1; string error_message = 2; } // Resposta para a operação Get. message GetResponse { repeated Version versions = 1; // Todas as versões ativas encontradas para a chave. string error_message = 2; }
-
API MQTT:
- Broker: Qualquer broker MQTT padrão (ex: Mosquitto) pode ser usado.
- Tópico:
kvstore/replication
- Payload: Quando um nó origina uma requisição
Put
(ou seja,is_replication_source
éfalse
emprocess_put
), ele deve serializar oStoreEntry
completo (não apenas aVersion
) para JSON e publicá-lo neste tópico. - Qualidade de Serviço (QoS): QoS 1 (AtLeastOnce) deve ser usado para publicação e assinatura para garantir que as mensagens sejam entregues.
Detalhes da Implementação do Servidor
-
Argumentos de Linha de Comando: O servidor deve aceitar os seguintes argumentos de linha de comando:
--node-id <ID>
: Um identificador de string único para este nó (ex:node_A
,node_B
). Se não for fornecido, gere um UUID.--listen-addr <ADDRESS>
: O endereço IP e a porta onde o servidor gRPC irá escutar (ex:127.0.0.1:50051
).--mqtt-broker-addr <ADDRESS>
: O endereço IP do broker MQTT. Se não for fornecido, utilize127.0.0.1
.--mqtt-broker-port <PORT>
: A porta do broker MQTT. Se não for fornecida, utilize1883
.
-
Gerenciamento de Concorrência:
- O
store
doNodeState
deve ser thread-safe. Use primitivas de sincronização apropriadas para sua linguagem (ex:Mutex
em Rust,sync.Mutex
/sync.RWMutex
em Go,threading.Lock
para um dicionário em Python,ConcurrentHashMap
em Java). - A publicação MQTT deve ser geralmente não bloqueante ou assíncrona da thread de tratamento de requisições gRPC.
- O
-
Fluxo Lógico Principal do Servidor:
- Analisar os argumentos de linha de comando.
- Inicializar a conexão do cliente MQTT com o broker especificado.
- Criar uma instância de
NodeState
. - Disparar uma Tarefa/Thread de Escuta MQTT: Esta unidade concorrente separada irá:
- Assinar o tópico
kvstore/replication
. - Continuamente buscar por mensagens MQTT recebidas.
- Quando uma mensagem no
kvstore/replication
for recebida:- Desserializar o payload JSON de volta para um objeto
StoreEntry
. - Para cada
Version
dentro doStoreEntry
desserializado, chamarprocess_put(key, version, true)
(importante:is_replication_source
deve sertrue
).
- Desserializar o payload JSON de volta para um objeto
- Assinar o tópico
- Iniciar o Servidor gRPC: Isso exporá os métodos
Put
eGet
aos clientes.- O método gRPC
Put
chamaráprocess_put(key, value, false)
. - O método gRPC
Get
chamaráprocess_get(key)
.
- O método gRPC
Requisitos do projeto
- Implementar Estruturas de Dados Essenciais: Traduza
VectorClock
,Version
eStoreEntry
para classes/estruturas na linguagem escolhida. - Implementar Algoritmos de Vector Clock: Implemente
merge_vector_clocks
ecompare_vector_clocks
corretamente. - Implementar
process_put
: Esta é a parte mais crítica para a consistência. Garanta que ela lide com atualizações de vector clock, resolução de conflitos e replicação MQTT condicional. - Implementar
process_get
: Garanta que ela retorne apenas versões ativas com base nas comparações de vector clock. - Implementação do Servidor gRPC:
- Use o arquivo
.proto
fornecido para gerar o código necessário. - Implemente o serviço
KvStore
com os RPCsPut
eGet
. - Integre com seu
NodeState
para chamarprocess_put
eprocess_get
.
- Use o arquivo
- Integração do Cliente MQTT:
- Use uma biblioteca de cliente MQTT adequada para sua linguagem.
- Configure uma tarefa/thread em segundo plano para assinar
kvstore/replication
. - Implemente a lógica para desserializar e processar mensagens de replicação recebidas.
- Implemente a lógica para serializar e publicar objetos
StoreEntry
após operaçõesPut
iniciadas pelo cliente.
- Análise de Argumentos de Linha de Comando: Implemente a análise de argumentos para
node-id
,listen-addr
,mqtt-broker-addr
emqtt-broker-port
.
Sugestões de bibliotecas por linguagem
- Java:
- gRPC: Use
grpc-java
. - MQTT: Use Eclipse Paho MQTT Client.
- Concorrência:
java.util.concurrent.ConcurrentHashMap
parastore
, blocossynchronized
ouReentrantLock
para travamento mais granular se necessário,ExecutorService
para tarefas em segundo plano. - JSON: Jackson ou Gson para serialização/desserialização de
StoreEntry
.
- gRPC: Use
- Go:
- gRPC: Use
google.golang.org/grpc
. - MQTT: Use
github.com/eclipse/paho.mqtt.golang
. - Concorrência:
sync.Mutex
ousync.RWMutex
parastore
,go
routines para tarefas concorrentes. - JSON: Pacote
encoding/json
.
- gRPC: Use
- Python:
- gRPC: Use
grpcio
. - MQTT: Use
paho-mqtt
. - Concorrência:
threading.Lock
parastore
(umdict
),threading.Thread
ouasyncio
para tarefas concorrentes. - JSON: Módulo
json
.
- gRPC: Use
- Rust:
- gRPC: Use
tonic
. - MQTT: Use
rumqttc
. - Concorrência:
tokio::sync::Mute
parastore
,tokio::spawn
para tarefas assíncronas. - JSON:
serde
comserde_json
.
- gRPC: Use
Teste e Verificação
- Configuração: É necessário um broker MQTT em execução (ex: Mosquitto).
- Funcionalidade Básica:
- Inicie vários nós (ex: Nó A na 50051, Nó B na 50052, Nó C na 50053).
- Use um cliente gRPC para realizar um
Put
de um par chave-valor no Nó A. - Verifique se as requisições
Get
para o Nó A retornam o valor correto. - Após um pequeno atraso (para replicação), as requisições
Get
para o Nó B e o Nó C também devem retornar o valor.
- Resolução de Conflitos:
- Realize um
Put
dekey="teste"
comvalue="primeiro"
no Nó A. - Quase simultaneamente, realize um
Put
dekey="teste"
comvalue="segundo"
no Nó B (antes que o Nó B receba a replicação do Nó A). - Após a replicação se estabilizar em todos os nós, uma requisição
Get
parakey="teste"
de qualquer nó deve retornar ambos "primeiro" e "segundo" como versões ativas, demonstrando concorrência.
- Realize um
- Substituição (Supersedence):
- Realize um
Put
dekey="exemplo"
comvalue="original"
no Nó A. - Espere pela replicação para todos os nós.
- Realize um
Put
dekey="exemplo"
comvalue="atualizado"
no Nó B. (O Nó B agora tem a versão original e a atualiza). - Após a replicação se estabilizar, as requisições
Get
parakey="exemplo"
de qualquer nó devem retornar apenas "atualizado", pois "original" deve ter sido substituída.
- Realize um
Detalhamento da Entrega
- Todos os arquivos necessários para execução do servidor devem ser colocados em um repositório privado no
github.com
com acesso liberado para meu usuário (paulo-coelho
). - Os trabalhos devem ser feitos, obrigatoriamente, em grupos de 4 alunos. Não serão aceitos trabalhos individuais, ou seja, será atribuída nota zero.
- O repositório deve conter, além do código-fonte, um arquivo
README.md
detalhando:- O que foi e o que não foi implementado.
- Como compilar o código.
- Como executar cada servidor.
- Principais dificuldades encontradas.
- Detalhamento das estruturas de dados que armazenam as chaves/valores/versões.
- Implementações que fizerem alterações no arquivo
.proto
ou que não funcionarem com o cliente fornecido não serão corrigidas e receberão nota zero. - O repositório deve conter, na pasta raiz, um script para compilação denominado
compile.sh
e um script para executar o servidor com os parâmetros necessários, denominadoserver.sh
. - Apenas as linguagens listadas a seguir serão aceitas. Trabalhos em outras linguagens não serão corrigidos e receberão nota zero.
- Go
- Java
- Python
- Rust
Este projeto oferece uma base sólida para a compreensão de aspectos críticos de sistemas distribuídos em um ambiente prático. Bom trabalho a todos!