NATS —— Pub/Sub JetStream 2026
****Kafka RabbitMQ Pulsar —— NATS NATS Pub/Sub JetStream Go/Python/Java
- NATS
- NATS Core NATS JetStream
- 10 NATS
- Core NATS
- JetStream
- Go
- Python
- Java Spring Boot
- TLSTokenJWT
- NATS
- NATS vs Kafka vs RabbitMQ vs Pulsar
- NATS
1. NATS
1.1
/ \
/ \
----
- Kafka
- RabbitMQErlang
- Pulsar
- Redis Streams
**NATS **
1.2 NATS
| NATS | ||
|---|---|---|
| **** | IOCore NATS | |
| ** 10M+ msg/s** | ||
| **** | Docker 20MB | |
| ** + ** | ||
| JetStream | At-Least-Once / Exactly-Once | |
| **Account ** | ||
| TLS/mTLS/JWT | ||
| **60+ ** | Go/Java/Python/Rust/C#/JS |
1.3 NATS
GitHub -
Netflix -
Walmart -
Siemens - IoT
Alibaba -
1.4 NATS
NATS
Core NATS NATS
- Pub/SubRequest/Reply
JetStream
- At-Least-OnceExactly-Once
2. NATS Core NATS JetStream
2.1 Core NATS
NATS Server
Subject
orders.* → [Client1, Client2]
users.> → [Client3]
system.health → [Monitor1, Monitor2]
Client A publish("orders.new") →
Client B ← "orders.new" (Sub)
Client C ← "orders.new" (Queue Sub)
SubjectNATS Kafka Topic
orders.neworders.*``orders.new,orders.paidusers.>``users.profile.update
Pub/Sub/
Queue Groups
Request/Reply/ RPC
2.2 JetStream
JetStream
Stream Consumer Storage
() → () ()
orders push-sub File/Memory
users pull-sub
Stream
- Max Msgs: 1000000
- Max Bytes: 1GB
- Max Age: 30d
- Replicas: 3 ()
**JetStream **
Stream
- Subject Stream
Consumer Stream
- Push Consumer
- Pull Consumer
- ****RedeliveryAckMax Deliveries
**Exactly-Once **
Publish with MsgId → NATS → → Consumer Ack → /
2.3 Subject
# bad
order_created
user_updated
# good
{domain}.{entity}.{action}
-------------------------------
shop.orders.created
shop.orders.paid
shop.orders.shipped
shop.users.registered
shop.users.updated
shop.inventory.adjusted
#
system.services.started
system.services.stopped
system.health.check
3. 10 NATS
3.1 Docker
# NATS Server JetStream
docker run -d \
--name nats \
-p 4222:4222 \
-p 8222:8222 \
-p 4223:4223 \
nats:latest \
-js -m 8222
#
# -js : JetStream
# -m 8222 : HTTP
# -p 4223 : MQTT
#
docker logs nats
#
# [INF] Starting nats-server
# [INF] JetStream is enabled
# [INF] Listening for client connections on 0.0.0.0:4222
3.2
# Linux/macOS
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.20/nats-server-v2.10.20-linux-amd64.tar.gz | tar xz
cd nats-server-v2.10.20-linux-amd64
sudo cp nats-server /usr/local/bin/
#
nats-server --version
# nats-server: v2.10.20
3.3
/etc/nats/nats.conf
# NATS Server
#
port: 4222
# JetStream
jetstream {
store_dir: "/var/lib/nats/jetstream"
max_memory_store: 1GB
max_file_store: 10GB
}
#
http_port: 8222
#
cluster {
name: "prod-cluster"
listen: "0.0.0.0:6222"
routes: [
"nats://192.168.1.10:6222"
"nats://192.168.1.11:6222"
"nats://192.168.1.12:6222"
]
}
#
authorization {
# Token
token: "s3cr3t-t0k3n"
#
users: [
{user: "app", password: "app-pass", permissions: {
publish: ["app.>"],
subscribe: ["app.>", "system.>"]
}},
{user: "monitor", password: "mon-pass", permissions: {
publish: [],
subscribe: ["*"]
}}
]
}
# TLS
tls {
cert_file: "/etc/nats/tls/server.crt"
key_file: "/etc/nats/tls/server.key"
ca_file: "/etc/nats/tls/ca.crt"
verify: true
}
#
log_file: "/var/log/nats/nats-server.log"
logtime: true
debug: false
trace: false
nats-server -c /etc/nats/nats.conf
# systemd9
3.4 nats-cli
# CLI
curl -L https://github.com/nats-io/natscli/releases/download/v0.1.0/nats-v0.1.0-linux-amd64.tar.gz | tar xz
sudo cp nats /usr/local/bin/
#
nats server info
# JetStream
nats jetstream status
# Stream
nats jetstream stream list
# Stream
nats jetstream stream info ORDERS
4. Core NATS
4.1 Pub/Sub /
**Go **
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// NATS
nc, err := nats.Connect(
"nats://localhost:4222",
nats.Name("Publisher-1"),
nats.Timeout(10*time.Second),
)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
ctx := context.Background()
// ========== ==========
//
err = nc.Publish("orders.new", []byte("Order #12345 created"))
if err != nil {
log.Fatal(err)
}
// Header JetStream
js, _ := nc.JetStream()
_, err = js.Publish("orders.new", []byte(`{
"order_id": "12345",
"user_id": "user-001",
"amount": 99.99,
"items": [{"sku": "ABC", "qty": 2}]
}`), nats.MsgId("order-12345")) //
if err != nil {
log.Fatal(err)
}
fmt.Println(" Message published")
// ========== ==========
//
sub, err := nc.Subscribe("orders.*", func(msg *nats.Msg) {
fmt.Printf(" Received: subject=%s, data=%s\n",
msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
//
subAll, _ := nc.Subscribe(">", func(msg *nats.Msg) {
fmt.Printf(" All Subjects: %s\n", msg.Subject)
})
defer subAll.Unsubscribe()
//
select {} // graceful shutdown
}
4.2 Queue Groups /
// ========== ==========
//
func startWorker(workerID string) {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
// QueueSubscribe
_, err := nc.QueueSubscribe("orders.process", "order-workers", func(msg *nats.Msg) {
fmt.Printf(" Worker %s processing: %s\n", workerID, string(msg.Data))
time.Sleep(100 * time.Millisecond) //
fmt.Printf(" Worker %s done\n", workerID)
})
if err != nil {
log.Fatal(err)
}
//
<-make(chan struct{})
}
// 3 Worker goroutine
go startWorker("Worker-1")
go startWorker("Worker-2")
go startWorker("Worker-3")
// 10
for i := 0; i < 10; i++ {
nc.Publish("orders.process", []byte(fmt.Sprintf("Task-%d", i)))
}
// 3 Worker 10
**Queue Group **
Publisher → NATS Server → Queue Group "order-workers"
Worker-1 ( Task-1, Task-4, Task-7)
Worker-2 ( Task-2, Task-5, Task-8)
Worker-3 ( Task-3, Task-6, Task-9, Task-10)
4.3 Request/Reply RPC
// ========== Request/Reply ==========
// Responder
func startService() {
nc, _ := nats.Connect("nats://localhost:4222")
_, err := nc.Subscribe("user.get", func(msg *nats.Msg) {
userID := string(msg.Data)
//
userData := fmt.Sprintf(`{"id":"%s","name":"John Doe","email":"john@example.com"}`, userID)
// msg.Respond
err := msg.Respond([]byte(userData))
if err != nil {
log.Printf("Reply failed: %v", err)
}
})
if err != nil {
log.Fatal(err)
}
fmt.Println(" User Service started")
<-make(chan struct{})
}
// Requester
func callService() {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
//
msg, err := nc.Request("user.get", []byte("user-001"), 2*time.Second)
if err != nil {
log.Fatalf("Request failed: %v", err)
}
fmt.Printf(" Response: %s\n", string(msg.Data))
// Response: {"id":"user-001","name":"John Doe","email":"john@example.com"}
}
//
go startService()
time.Sleep(100 * time.Millisecond) //
callService()
**Request/Reply **
//
func requestWithRetry(nc *nats.Conn, subject string, data []byte, maxRetries int) (*nats.Msg, error) {
var lastErr error
for i := 0; i < maxRetries; i++ {
msg, err := nc.Request(subject, data, 2*time.Second)
if err == nil {
return msg, nil
}
lastErr = err
fmt.Printf(" Attempt %d failed: %v\n", i+1, err)
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) //
}
return nil, fmt.Errorf("all %d attempts failed: %w", maxRetries, lastErr)
}
5. JetStream
5.1 Stream
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func setupJetStream() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// ========== Stream ==========
// Stream
streamName := "ORDERS"
subjects := []string{"orders.>"}
// Stream
_, err = js.StreamInfo(streamName)
if err != nil {
// Stream
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: subjects,
//
Retention: nats.LimitsPolicy, // 3
//
MaxMsgs: 1000000, // 100
MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
MaxAge: 30 * 24 * time.Hour, // 30
//
Storage: nats.FileStorage, // FileStorage/ MemoryStorage
//
Replicas: 1, // =1=3
// MsgId
Duplicates: 2 * time.Hour, // 2 MsgId
// Kafka Partition
NumPartitions: 0, // 0 =
})
if err != nil {
log.Fatalf("Failed to add stream: %v", err)
}
fmt.Println(" Stream created:", streamName)
} else {
fmt.Println("ℹ Stream already exists:", streamName)
}
// ========== Stream ==========
// JetStream Publish
ack, err := js.Publish("orders.new", []byte(`{
"order_id": "ORD-2024-001",
"user_id": "user-123",
"items": [
{"sku": "IPHONE15", "qty": 1, "price": 999.99}
],
"total": 999.99,
"created_at": "2024-01-15T10:30:00Z"
}`),
nats.MsgId("ORD-2024-001"), // MsgId
)
if err != nil {
log.Fatalf("Publish failed: %v", err)
}
fmt.Printf(" Message published: stream=%s, seq=%d\n",
ack.Stream, ack.Seq)
}
5.2 Consumer
func setupConsumer() {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
js, _ := nc.JetStream()
// ========== Pull Consumer==========
//
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "order-processor", //
Description: "Process orders from ORDERS stream",
//
DeliverPolicy: nats.DeliverAllPolicy, //
AckPolicy: nats.AckExplicitPolicy, // Ack
// Ack
MaxDeliver: 5, // 5
AckWait: 30 * time.Second, // Ack
//
MaxAckPending: 100, // 100 Ack prefetch
// Filter Subject
FilterSubject: "orders.new",
})
if err != nil {
//
fmt.Println("ℹ Consumer already exists")
}
// ========== ==========
sub, err := js.PullSubscribe("orders.new", "order-processor")
if err != nil {
log.Fatal(err)
}
// 10
msgs, err := sub.Fetch(10, nats.MaxWait(5*time.Second))
if err != nil {
log.Printf("Fetch failed: %v", err)
return
}
fmt.Printf(" Fetched %d messages\n", len(msgs))
for _, msg := range msgs {
//
fmt.Printf(" - Seq=%d, Data=%s\n", msg.Sequence, string(msg.Data))
//
err := msg.Ack()
if err != nil {
log.Printf("Ack failed: %v", err)
}
}
}
// ========== Push Consumer==========
//
func setupPushConsumer() {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
js, _ := nc.JetStream()
// Push Consumer
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "order-notifier",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
// ignore
}
//
_, err = js.Subscribe("orders.>", func(msg *nats.Msg) {
fmt.Printf(" Push received: subject=%s, seq=%d\n",
msg.Subject, msg.Sequence)
//
// ...
// Ack
msg.Ack()
},
nats.Durable("order-notifier"),
nats.ManualAck(), // Ack
)
if err != nil {
log.Fatal(err)
}
fmt.Println(" Push consumer started")
<-make(chan struct{}) //
}
5.3
// JetStream Queue Subscribe
func startJetStreamWorker(workerID string) {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
js, _ := nc.JetStream()
// QueueSubscribe + Durable
// Durable + =
_, err := js.QueueSubscribe("orders.>", "order-workers", func(msg *nats.Msg) {
fmt.Printf(" Worker %s processing: seq=%d\n", workerID, msg.Sequence)
//
time.Sleep(50 * time.Millisecond)
// Ack
msg.Ack()
fmt.Printf(" Worker %s done\n", workerID)
},
nats.Durable("order-workers"), //
nats.ManualAck(),
)
if err != nil {
log.Fatal(err)
}
fmt.Printf(" Worker %s started\n", workerID)
<-make(chan struct{})
}
// 3 Worker/
go startJetStreamWorker("Worker-A")
go startJetStreamWorker("Worker-B")
go startJetStreamWorker("Worker-C")
5.4 Exactly-Once
// MsgId
// nats.MsgId()
func publishWithDeduplication(js nats.JetStreamContext, orderID string, orderData []byte) error {
// MsgId
msgID := fmt.Sprintf("order-%s", orderID)
_, err := js.Publish("orders.new", orderData,
nats.MsgId(msgID), //
nats.ExpectStream("ORDERS"), // Stream
)
//
if err == nats.ErrKeyExists {
fmt.Printf(" Duplicate message detected: %s (ignored)\n", msgID)
return nil //
}
return err
}
//
for i := 0; i < 5; i++ {
err := publishWithDeduplication(js, "ORD-001", []byte("order data"))
if err != nil {
log.Printf("Publish failed: %v", err)
}
}
// Stream 4
6. Go
6.1
nats-producer-consumer/
cmd/
producer/
main.go
consumer/
main.go
internal/
nats/
client.go
publisher.go
subscriber.go
config/
config.go
pkg/
models/
order.go
docker-compose.yml
Dockerfile
go.mod
6.2 config.go
package config
import (
"time"
)
type Config struct {
NATS NATSConfig `yaml:"nats"`
App AppConfig `yaml:"app"`
Logging LoggingConfig `yaml:"logging"`
}
type NATSConfig struct {
URL string `yaml:"url"`
Token string `yaml:"token"`
TLS bool `yaml:"tls"`
TLSCert string `yaml:"tls_cert"`
TLSKey string `yaml:"tls_key"`
TLSRootCA string `yaml:"tls_root_ca"`
// JetStream
StreamName string `yaml:"stream_name"`
StreamSubjects []string `yaml:"stream_subjects"`
//
MaxReconnect int `yaml:"max_reconnect"`
ReconnectWait time.Duration `yaml:"reconnect_wait"`
Timeout time.Duration `yaml:"timeout"`
}
type AppConfig struct {
ServiceName string `yaml:"service_name"`
Environment string `yaml:"environment"`
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"`
}
type LoggingConfig struct {
Level string `yaml:"level"`
Format string `yaml:"format"`
Output string `yaml:"output"`
}
func LoadConfig() *Config {
return &Config{
NATS: NATSConfig{
URL: "nats://localhost:4222",
Token: "s3cr3t-t0k3n",
StreamName: "ORDERS",
StreamSubjects: []string{"orders.>"},
MaxReconnect: 10,
ReconnectWait: 2 * time.Second,
Timeout: 5 * time.Second,
},
App: AppConfig{
ServiceName: "order-service",
Environment: "production",
ShutdownTimeout: 30 * time.Second,
},
Logging: LoggingConfig{
Level: "info",
Format: "json",
Output: "stdout",
},
}
}
6.3 NATS client.go
package nats
import (
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type Client struct {
conn *nats.Conn
js jetstream.JetStream
mu sync.RWMutex
config *Config
}
type Config struct {
URL string
Token string
TLSEnabled bool
TLSCert string
TLSKey string
TLSRootCA string
MaxReconnect int
ReconnectWait time.Duration
Timeout time.Duration
}
func NewClient(cfg *Config) (*Client, error) {
opts := []nats.Option{
nats.Name("OrderService-Client"),
nats.MaxReconnects(cfg.MaxReconnect),
nats.ReconnectWait(cfg.ReconnectWait),
nats.Timeout(cfg.Timeout),
nats.Token(cfg.Token),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Printf(" NATS disconnected: %v\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Printf(" NATS reconnected: %s\n", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
fmt.Printf(" NATS connection closed\n")
}),
}
// TLS
if cfg.TLSEnabled {
// tls.LoadX509KeyPair()
// opts = append(opts, nats.ClientCert(...))
}
conn, err := nats.Connect(cfg.URL, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
// JetStream
js, err := jetstream.New(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
return &Client{
conn: conn,
js: js,
config: cfg,
}, nil
}
func (c *Client) Connection() *nats.Conn {
return c.conn
}
func (c *Client) JetStream() jetstream.JetStream {
return c.js
}
func (c *Client) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil && !c.conn.IsClosed() {
fmt.Println(" Closing NATS connection...")
c.conn.Close()
}
}
//
func (c *Client) HealthCheck() error {
if c.conn == nil || c.conn.IsClosed() {
return fmt.Errorf("NATS connection is closed")
}
if !c.conn.IsConnected() {
return fmt.Errorf("NATS is in disconnected state")
}
return nil
}
6.4 publisher.go
package nats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go/jetstream"
"github.com/yourorg/nats-demo/pkg/models"
)
type Publisher struct {
client *Client
stream string
}
func NewPublisher(client *Client, stream string) *Publisher {
return &Publisher{
client: client,
stream: stream,
}
}
// PublishOrder
func (p *Publisher) PublishOrder(ctx context.Context, order *models.Order) error {
//
data, err := json.Marshal(order)
if err != nil {
return fmt.Errorf("failed to marshal order: %w", err)
}
// MsgId
msgID := fmt.Sprintf("order-%s", order.ID)
// JetStream
ack, err := p.client.JetStream().Publish(
ctx,
"orders.new",
data,
jetstream.WithMsgID(msgID),
jetstream.WithExpectStream(p.stream),
jetstream.WithRetryAttempts(3),
)
if err != nil {
return fmt.Errorf("failed to publish order: %w", err)
}
fmt.Printf(" Order published: stream=%s, seq=%d, order_id=%s\n",
ack.Stream, ack.Seq, order.ID)
return nil
}
// PublishBatch
func (p *Publisher) PublishBatch(ctx context.Context, orders []*models.Order) error {
for _, order := range orders {
if err := p.PublishOrder(ctx, order); err != nil {
return err
}
}
return nil
}
// AsyncPublish
func (p *Publisher) AsyncPublish(order *models.Order) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
errCh <- p.PublishOrder(ctx, order)
}()
return errCh
}
6.5 subscriber.go
package nats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go/jetstream"
"github.com/yourorg/nats-demo/pkg/models"
)
type Subscriber struct {
client *Client
stream string
consumer string
concurrency int
}
func NewSubscriber(client *Client, stream, consumer string, concurrency int) *Subscriber {
return &Subscriber{
client: client,
stream: stream,
consumer: consumer,
concurrency: concurrency,
}
}
// Start Push
func (s *Subscriber) Start(ctx context.Context) error {
// Consumer
_, err := s.client.JetStream().AddConsumer(ctx, s.stream, jetstream.ConsumerConfig{
Durable: s.consumer,
DeliverPolicy: jetstream.DeliverAllPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 5,
AckWait: 30 * time.Second,
MaxAckPending: s.concurrency * 10, //
})
if err != nil {
return fmt.Errorf("failed to add consumer: %w", err)
}
// Worker
for i := 0; i < s.concurrency; i++ {
workerID := i
go s.worker(ctx, workerID)
}
fmt.Printf(" Subscriber started: stream=%s, consumer=%s, concurrency=%d\n",
s.stream, s.consumer, s.concurrency)
// Context
<-ctx.Done()
fmt.Println(" Subscriber stopped")
return nil
}
func (s *Subscriber) worker(ctx context.Context, workerID int) {
//
sub, err := s.client.JetStream().Subscribe(
ctx,
"orders.new",
s.consumer,
func(msg jetstream.Msg) {
s.handleMessage(ctx, workerID, msg)
},
jetstream.ManualAck(),
)
if err != nil {
fmt.Printf(" Worker %d failed to subscribe: %v\n", workerID, err)
return
}
defer sub.Unsubscribe()
fmt.Printf(" Worker %d started\n", workerID)
//
<-ctx.Done()
}
func (s *Subscriber) handleMessage(ctx context.Context, workerID int, msg jetstream.Msg) {
start := time.Now()
//
var order models.Order
if err := json.Unmarshal(msg.Data(), &order); err != nil {
fmt.Printf(" Worker %d failed to unmarshal: %v\n", workerID, err)
// Nak
msg.Nak()
return
}
fmt.Printf(" Worker %d processing: order_id=%s, subject=%s\n",
workerID, order.ID, msg.Subject())
//
if err := s.processOrder(ctx, &order); err != nil {
fmt.Printf(" Worker %d processing failed: %v\n", workerID, err)
//
if isRetryableError(err) {
msg.Nak() //
} else {
msg.Ack() // Ack
// TODO:
}
return
}
// Ack
if err := msg.Ack(); err != nil {
fmt.Printf(" Worker %d failed to ack: %v\n", workerID, err)
}
elapsed := time.Since(start)
fmt.Printf(" Worker %d done: order_id=%s, elapsed=%s\n",
workerID, order.ID, elapsed)
}
func (s *Subscriber) processOrder(ctx context.Context, order *models.Order) error {
//
time.Sleep(50 * time.Millisecond)
//
// 1.
// 2.
// 3.
// 4.
return nil
}
func isRetryableError(err error) bool {
//
//
//
return false //
}
6.6 Docker Compose
# docker-compose.yml
version: '3.8'
services:
nats:
image: nats:latest
container_name: nats-server
command: >
-js
-m 8222
--store_dir /data/jetstream
--max_memory_store 1GB
--max_file_store 10GB
ports:
- "4222:4222" # Client
- "8222:8222" # Monitoring
- "6222:6222" # Cluster
volumes:
- nats-data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8222/healthz"]
interval: 10s
timeout: 5s
retries: 5
nats-exporter:
image: natsio/prometheus-nats-exporter:latest
container_name: nats-exporter
command: >
--connz
--varz
--subz
--channelz
http://nats:8222
ports:
- "7777:7777"
depends_on:
- nats
restart: unless-stopped
producer:
build: .
container_name: nats-producer
command: ["./producer"]
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats
restart: unless-stopped
consumer:
build: .
container_name: nats-consumer
command: ["./consumer"]
deploy:
replicas: 3 # 3
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats
restart: unless-stopped
volumes:
nats-data:
7. Python
7.1
pip install nats-py
7.2 Python
import asyncio
import json
import nats
from nats.js import JetStreamContext
async def produce_orders():
# NATS
nc = await nats.connect("nats://localhost:4222")
# JetStream
js = nc.jetstream()
# Stream
try:
await js.add_stream(
name="ORDERS",
subjects=["orders.>"],
retention="limits",
max_msgs=1000000,
max_bytes=10 * 1024 * 1024 * 1024, # 10GB
max_age=30 * 24 * 3600 * 1_000_000_000, # 30
storage="file",
duplicates=2 * 3600 * 1_000_000_000, # 2
)
print(" Stream created")
except nats.js.errors.BadRequestError as e:
print(f"ℹ Stream already exists: {e}")
#
orders = [
{"order_id": "ORD-001", "user_id": "user-001", "total": 99.99},
{"order_id": "ORD-002", "user_id": "user-002", "total": 199.99},
{"order_id": "ORD-003", "user_id": "user-003", "total": 299.99},
]
for order in orders:
ack = await js.publish(
"orders.new",
json.dumps(order).encode(),
stream="ORDERS",
msg_id=f"order-{order['order_id']}" #
)
print(f" Published: stream={ack.stream}, seq={ack.seq}, order_id={order['order_id']}")
await nc.close()
if __name__ == "__main__":
asyncio.run(produce_orders())
7.3 Python
import asyncio
import json
import nats
from nats.js import JetStreamContext
async def consume_orders():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Consumer
try:
await js.add_consumer(
"ORDERS",
durable_name="python-order-processor",
deliver_policy="all",
ack_policy="explicit",
max_deliver=5,
ack_wait=30,
max_ack_pending=100,
)
print(" Consumer created")
except Exception as e:
print(f"ℹ Consumer config: {e}")
# Push
async def message_handler(msg):
subject = msg.subject
data = json.loads(msg.data.decode())
print(f" Received: subject={subject}, order_id={data.get('order_id')}")
#
await asyncio.sleep(0.05) #
# Ack
await msg.ack()
print(f" Acked: order_id={data.get('order_id')}")
#
sub = await js.subscribe(
"orders.new",
"python-order-processor",
cb=message_handler,
manual_ack=True,
)
print(" Python consumer started. Waiting for messages...")
#
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n Shutting down...")
await sub.unsubscribe()
await nc.close()
if __name__ == "__main__":
asyncio.run(consume_orders())
7.4 Python Pull Consumer
async def pull_consumer():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Pull Consumer
await js.add_consumer(
"ORDERS",
durable_name="python-batch-processor",
deliver_policy="all",
ack_policy="explicit",
)
# Pull Subscribe
sub = await js.pull_subscribe(
"orders.new",
"python-batch-processor",
)
print(" Pull consumer started")
while True:
# 10
try:
msgs = await sub.fetch(batch=10, timeout=5)
print(f" Fetched {len(msgs)} messages")
for msg in msgs:
data = json.loads(msg.data.decode())
print(f" - Processing: order_id={data.get('order_id')}")
await msg.ack()
print(f" Batch completed")
except nats.errors.TimeoutError:
print("⏳ No messages available, waiting...")
await asyncio.sleep(2)
except Exception as e:
print(f" Error: {e}")
break
await nc.close()
8. Java Spring Boot
8.1 Maven
<dependencies>
<!-- NATS Java Client -->
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.19.0</version>
</dependency>
<!-- JetStream -->
<dependency>
<groupId>io.nats</groupId>
<artifactId>jetstream</artifactId>
<version>2.19.0</version>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
8.2 NATS
@Configuration
@ConfigurationProperties(prefix = "nats")
@Data
public class NatsConfig {
private String url;
private String token;
private boolean tlsEnabled;
private String streamName;
private List<String> streamSubjects;
@Bean
public Connection natsConnection() throws Exception {
Options options = new Options.Builder()
.server(url)
.token(token)
.maxReconnects(10)
.reconnectWait(Duration.ofSeconds(2))
.connectionTimeout(Duration.ofSeconds(5))
.errorListener(new ErrorListener() {
@Override
public void exceptionOccurred(Connection conn, Exception ex) {
log.error("NATS exception", ex);
}
@Override
public void disconnected(Connection conn) {
log.warn("NATS disconnected");
}
@Override
public void reconnected(Connection conn) {
log.info("NATS reconnected: {}", conn.getConnectedUrl());
}
})
.build();
return Nats.connect(options);
}
@Bean
public JetStream jetStream(Connection nc) throws Exception {
return nc.jetStream();
}
}
8.3 Spring Boot
@Service
@Slf4j
public class OrderPublisher {
@Autowired
private JetStream jetStream;
@Value("${nats.stream-name}")
private String streamName;
private final ObjectMapper objectMapper = new ObjectMapper();
@PostConstruct
public void initStream() {
try {
// Stream
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name(streamName)
.subjects("orders.>")
.retentionPolicy(RetentionPolicy.Limits)
.maxMessages(1_000_000)
.maxBytes(10 * 1024 * 1024 * 1024L)
.maxAge(Duration.ofDays(30))
.storageType(StorageType.File)
.duplicates(2, TimeUnit.HOURS)
.build();
jetStream.addStream(streamConfig);
log.info(" Stream created: {}", streamName);
} catch (JetStreamApiException e) {
log.info("ℹ Stream already exists: {}", streamName);
} catch (Exception e) {
log.error("Failed to create stream", e);
}
}
public CompletableFuture<PublishAck> publishOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
try {
String msgId = "order-" + order.getId();
byte[] data = objectMapper.writeValueAsBytes(order);
PublishOptions opts = PublishOptions.builder()
.stream(streamName)
.messageId(msgId)
.build();
PublishAck ack = jetStream.publish(
"orders.new",
data,
opts
);
log.info(" Order published: stream={}, seq={}, orderId={}",
ack.getStream(), ack.getSeq(), order.getId());
return ack;
} catch (Exception e) {
log.error("Failed to publish order: {}", order.getId(), e);
throw new RuntimeException(e);
}
});
}
}
8.4 Spring Boot
@Service
@Slf4j
public class OrderConsumer {
@Autowired
private Connection nc;
@Autowired
private JetStream jetStream;
@Value("${nats.stream-name}")
private String streamName;
private Dispatcher dispatcher;
@PostConstruct
public void initConsumer() {
try {
// Consumer
ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder()
.durable("spring-order-processor")
.deliverPolicy(DeliverPolicy.All)
.ackPolicy(AckPolicy.Explicit)
.maxDeliver(5)
.ackWait(30, TimeUnit.SECONDS)
.build();
jetStream.addConsumer(streamName, consumerConfig);
log.info(" Consumer created: spring-order-processor");
//
startSubscription();
} catch (Exception e) {
log.error("Failed to init consumer", e);
}
}
private void startSubscription() throws Exception {
// Dispatcher
dispatcher = nc.createDispatcher();
// Push
JetStreamSubscription sub = jetStream.subscribe(
"orders.new",
dispatcher,
this::handleMessage,
false, // auto-ack ( false Ack)
"spring-order-processor"
);
log.info(" Consumer started");
}
private void handleMessage(Message msg) {
try {
//
Order order = objectMapper.readValue(msg.getData(), Order.class);
log.info(" Received: orderId={}, subject={}",
order.getId(), msg.getSubject());
//
processOrder(order);
// Ack
msg.ack();
log.info(" Order processed: orderId={}", order.getId());
} catch (Exception e) {
log.error("Failed to process message", e);
// Nak
msg.nak();
}
}
private void processOrder(Order order) {
//
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@PreDestroy
public void cleanup() {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
9.
9.1 NATS
NATS
Node-1 Node-2 Node-3
(Seed) (Peer) (Peer)
JetStream JetStream JetStream
Replica Replica Replica
Subject
Client A
Client B
9.2
Node-1
# nats-node1.conf
port: 4222
server_name: "nats-node-1"
# JetStream
jetstream {
store_dir: "/var/lib/nats/jetstream"
max_memory_store: 2GB
max_file_store: 50GB
}
#
http_port: 8222
#
cluster {
name: "production-cluster"
listen: "0.0.0.0:6222"
# routes
# routes: []
#
# IP
}
# JetStream Raft
jetstream {
store_dir: "/var/lib/nats/jetstream"
# peers
raft {
# Node-1 bootstrap
bootstrap: true
}
}
**Node-2 **
# nats-node2.conf
port: 4222
server_name: "nats-node-2"
jetstream {
store_dir: "/var/lib/nats/jetstream"
max_memory_store: 2GB
max_file_store: 50GB
}
http_port: 8222
cluster {
name: "production-cluster"
listen: "0.0.0.0:6222"
#
routes: [
"nats://192.168.1.10:6222" # Node-1
]
}
**Node-3 **
# nats-node3.conf
# ... Node-2routes Node-1 Node-2
cluster {
name: "production-cluster"
listen: "0.0.0.0:6222"
routes: [
"nats://192.168.1.10:6222",
"nats://192.168.1.11:6222"
]
}
9.3
# Node-1
nats-server -c /etc/nats/nats-node1.conf
# Node-2 Node-1
nats-server -c /etc/nats/nats-node2.conf
# Node-3
nats-server -c /etc/nats/nats-node3.conf
#
nats server ls --server nats://192.168.1.10:4222
# 3
9.4 Stream
// Stream Replicas=3
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000000,
MaxBytes: 10 * 1024 * 1024 * 1024,
MaxAge: 30 * 24 * time.Hour,
Storage: nats.FileStorage,
Replicas: 3, // 3
Duplicates: 2 * time.Hour,
})
Stream ORDERS (Replicas=3)
Leader Replica (Node-1) ←
Follower Replica (Node-2) ←
Follower Replica (Node-3) ←
1. Client → Leader (Node-1)
2. Leader
3. Leader to Follower-1 (Node-2)
4. Leader to Follower-2 (Node-3)
5. → Ack to Client
- Leader → Raft Leader
- Leader
9.5 systemd NATS
# /etc/systemd/system/nats.service
[Unit]
Description=NATS Server
After=network.target
[Service]
Type=simple
User=nats
Group=nats
ExecStart=/usr/local/bin/nats-server -c /etc/nats/nats.conf
Restart=on-failure
RestartSec=5s
LimitNOFILE=1000000
#
Environment="NATS_LOG_LEVEL=info"
#
KillMode=process
TimeoutStopSec=30
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable nats
sudo systemctl start nats
sudo systemctl status nats
#
sudo journalctl -u nats -f
10. TLSTokenJWT
10.1 Token
# nats.conf
authorization {
# Token
token: "s3cr3t-t0k3n-2024"
# Token
# token_file: "/etc/nats/token"
}
nc, err := nats.Connect("nats://localhost:4222",
nats.Token("s3cr3t-t0k3n-2024"),
)
10.2 RBAC
# nats.conf
authorization {
users: [
{
user: "producer"
password: "prod-pass-2024"
permissions: {
publish: ["orders.>", "users.>"]
subscribe: []
}
},
{
user: "consumer"
password: "cons-pass-2024"
permissions: {
publish: []
subscribe: ["orders.>", "system.health"]
}
},
{
user: "admin"
password: "admin-pass-2024"
permissions: {
publish: [">"]
subscribe: [">"]
}
}
]
}
10.3 TLS/mTLS
# 1. CA
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 365 -key ca.key -out ca.crt
# 2.
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr -subj "/CN=nats-server"
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -out server.crt
# 3. mTLS
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr -subj "/CN=nats-client"
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -out client.crt
NATS TLS
# nats.conf
tls {
cert_file: "/etc/nats/tls/server.crt"
key_file: "/etc/nats/tls/server.key"
ca_file: "/etc/nats/tls/ca.crt"
# mTLS
verify: true
#
# cipher_suites: ["TLS_AES_256_GCM_SHA384"]
# TLS
# tls_min_version: "tls1.2"
}
# TLS
port: 4222
TLS
nc, err := nats.Connect("tls://localhost:4222",
nats.ClientCert("/etc/nats/tls/client.crt", "/etc/nats/tls/client.key"),
nats.RootCAs("/etc/nats/tls/ca.crt"),
)
10.4 JWT
NATS JWT SaaS
# nsc JWTNATS Security CLI
# go install github.com/nats-io/nsc/cmd/nsc@latest
#
nsc init
# Operator
nsc add operator --name MyOperator
# Account
nsc add account --name TenantA
nsc add account --name TenantB
# User
nsc add user --account TenantA --name app-a1
# JWT Seed
nsc generate creds --account TenantA --user app-a1
# JWT
nc, err := nats.Connect("nats://localhost:4222",
nats.UserCredentials("app-a1.creds"),
)
11. NATS
11.1
# Linux /etc/sysctl.conf
# 1.
fs.file-max = 1000000
# 2.
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65535
net.ipv4.tcp_max_syn_backlog = 65535
# 3. TCP
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
# 4. TCP
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30
# 5. Swap
vm.swappiness = 0
#
sysctl -p
11.2 NATS
# nats.conf
#
max_connections: 65536
max_control_line: 4096
max_payload: 1048576 # 1MB
# JetStream
jetstream {
store_dir: "/var/lib/nats/jetstream"
# 50-70%
max_memory_store: 8GB
#
max_file_store: 100GB
# SSD
# store_dir: "/mnt/nvme/nats/jetstream"
#
# sync_interval: "0s" # fsync
# sync_interval: "1s" # fsync
}
#
http_port: 8222
11.3
// Go
nc, err := nats.Connect("nats://localhost:4222",
//
nats.MaxReconnects(10),
nats.ReconnectWait(100 * time.Millisecond),
nats.Timeout(2 * time.Second),
//
nats.DontRandomize(),
// Pending
nats.MaxPendingMsgs(65536),
nats.MaxPendingBytes(64 * 1024 * 1024), // 64MB
)
11.4 JetStream
// Batch Publish
func batchPublish(js nats.JetStreamContext, orders []*Order) error {
// Ack
ackFutures := make([]nats.PubAckFuture, 0, len(orders))
for _, order := range orders {
data, _ := json.Marshal(order)
future, err := js.PublishAsync("orders.new", data, nats.MsgId(fmt.Sprintf("order-%s", order.ID)))
if err != nil {
return err
}
ackFutures = append(ackFutures, future)
}
// Ack
for _, future := range ackFutures {
select {
case ack := <-future.Ok():
fmt.Printf(" Ack: stream=%s, seq=%d\n", ack.Stream, ack.Seq)
case err := <-future.Err():
return fmt.Errorf("publish failed: %w", err)
case <-time.After(5 * time.Second):
return fmt.Errorf("publish timeout")
}
}
return nil
}
11.5
# nats-bench
#
go install github.com/nats-io/nats.go/examples/nats-bench@latest
# Pub/Sub
nats-bench -n 1000000 -np 10 -ns 10 "orders.new"
# -n:
# -np:
# -ns:
# JetStream
nats-bench -js -n 100000 -np 5 -ns 5 "orders.new"
# -js: JetStream
#
# Pub stats: 10,000,000 msgs in 2.31 sec, 4,329,004 msgs/sec, 418.46 MB/sec
# Sub stats: 10,000,000 msgs in 2.45 sec, 4,081,632 msgs/sec, 394.50 MB/sec
12. NATS vs Kafka vs RabbitMQ vs Pulsar
12.1
| NATS | Kafka | RabbitMQ | Pulsar | |
|---|---|---|---|---|
| Erlang | ||||
| JetStream | ||||
| Pub/SubQueue | Exchange | |||
| Partition | Queue | Topic | ||
| JetStream |
12.2
msg/s
NATS (Core) 10M+
NATS (JS) 5M
Kafka 3M
RabbitMQ 500K
Pulsar 2M
P99
NATS (Core) 1ms
NATS (JS) 5ms
Kafka 50ms
RabbitMQ 20ms
Pulsar 15ms
12.3
** NATS**
- / Kubernetes
- IoT
- Pub/Sub Request/Reply
** Kafka**
- Event Sourcing
- Kafka Kafka ConnectKSQL
** RabbitMQ**
- HeadersTopic Exchange
- RabbitMQ
- Erlang/Elixir
** Pulsar**
- SaaS
13.
13.1
NATS http_port: 8222
#
curl http://localhost:8222/varz
#
curl http://localhost:8222/connz
# Subject
curl http://localhost:8222/subsz
# JetStream
curl http://localhost:8222/jsz
#
curl http://localhost:8222/healthz
varz:
- connections:
- total_msgs_in:
- total_msgs_out:
- total_bytes_in:
- total_bytes_out:
- cpu: CPU
- mem:
jetstream:
- streams: Stream
- consumers: Consumer
- messages:
- bytes:
- reserved_memory:
13.2 Prometheus
# prometheus-alerts.yml
groups:
- name: nats
rules:
- alert: NATSConnectionHigh
expr: nats_connections > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "NATS connections too high: {{ $value }}"
- alert: NATSJetStreamStorageHigh
expr: nats_jetstream_used_bytes / nats_jetstream_reserved_bytes > 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "JetStream storage usage > 80%"
- alert: NATSConsumerLagHigh
expr: nats_jetstream_consumer_pending_messages > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag too high: {{ $value }} pending messages"
13.3
** 1**
1. Subject
- orders.new
- order.new ←
2. JetStream Stream
nats jetstream stream list
3. Consumer
nats jetstream consumer list ORDERS
4. MaxMsgsMaxBytesMaxAge
nats jetstream stream info ORDERS
5. Ack AckNone
** 2**
pending_messages
1.
nats jetstream consumer info ORDERS order-processor
2.
-
-
3. Ack
NATS /var/log/nats/nats-server.log
4. MaxAckPending
MaxAckPending
** 3**
1. 6222
telnet 192.168.1.11 6222
2. routes
3. Raft JetStream
nats jetstream raft list
4.
nats server request --server nats://localhost:4222 "jetstream|v1|raft|stepdown"
13.4
# JetStream
# 1 NATS
systemctl stop nats
tar -czf nats-backup-$(date +%Y%m%d).tar.gz /var/lib/nats/jetstream
systemctl start nats
# 2 nats CLI Stream
nats jetstream stream backup ORDERS /backup/ORDERS
#
nats jetstream stream restore ORDERS /backup/ORDERS
14. NATS
14.1
**NATS Kafka **
- Kafka
- NATS
JetStream NATS +
- 20MB
- Kubernetes
- GoJavaPythonRustC#JavaScript 60+
14.2
DO:
- Subjectshop.orders.created
- JetStream Replicas=3
- TLS/mTLS
- MsgId
- pending_messages
DON'T:
- NATS >30
- Ack
- >1MB
- Core NATS JetStream
14.3 NATS
- NATS 3.0
- **MQTT ** IoT
- **WebSocket ** NATS
- **WebAssembly ** NATS Wasm
GitHub: https://github.com/yourorg/nats-deep-dive-2026
go-producer/
go-consumer/
python-examples/
java-springboot/
docker-compose.yml
k8s-manifests/
****NATS NATS NATS
- NATS Official Documentation
- JetStream Design Document
- NATS Blog: Performance Tuning
- Cloud Native Patterns with NATS
- 2026 6 NATS v2.10.20 *