Skip to main content

Workers Architecture - 12 Specialized Workers

Sistema de workers especializados para procesamiento distribuido, sincronización y automatización del CDP Nerdistan.

Overview

Estado: ✅ 100% OPERACIONAL (actualizado 2 de Octubre 2025) Repositorio: nerdistan-worker Deployment: Railway Platform Total Workers: 12 workers especializados

Arquitectura General

graph TB
subgraph Railway["Railway Platform"]
W1[vtex-sync-worker]
W2[daily-sync-worker]
W3[product-sync-worker]
W4[facebook-automation]
W5[analytics-worker]
W6[scheduler]
W7[smart-sync-worker]
W8[historical-sync-service]
W9[google-ads-worker]
W10[vtex-feed-v3-worker]
W11[dynamic-collections-worker]
W12[legacy-optimization-suite]
end

subgraph Database["PostgreSQL Database"]
DB[(CDP Database)]
end

subgraph External["External APIs"]
VTEX[VTEX API]
FB[Facebook Ads API]
GA[Google Ads API]
end

W1 --> DB
W2 --> DB
W3 --> DB
W4 --> DB
W5 --> DB
W7 --> DB
W8 --> DB
W9 --> DB
W10 --> DB
W11 --> DB
W12 --> DB

W1 --> VTEX
W2 --> VTEX
W3 --> VTEX
W4 --> FB
W9 --> GA
W10 --> VTEX

W6 -.schedule.-> W2
W6 -.schedule.-> W3
W6 -.schedule.-> W4
W6 -.schedule.-> W5

Workers Especializados

1. VTEX Sync Worker

Entry Point: start_vtex_sync.py Purpose: Sincronización directa VTEX para nuevos tenants y prioritarios

Características:

  • Procesa tenants >= 100 (nuevos)
  • Priority tenants: [20, 25, 28, 110]
  • Data range: 2023 onwards
  • Sync modes: initial / incremental

Variables:

DATABASE_URL=auto
SYNC_MODE=initial/incremental
DAYS_BACK=30
TZ=America/Argentina/Buenos_Aires

Railway Service: vtex-sync-worker

Métricas:

  • Tenants activos: ~35
  • Sync diario: ~5,000 orders
  • Performance: menos de 10 min/tenant

2. Daily Sync Worker

Entry Point: start_daily_sync.py Purpose: Sincronización programada diaria para todos los tenants activos

Características:

  • Schedule: 6:00 AM y 6:00 PM Argentina
  • Data range: Últimas 48 horas
  • Todos los tenants activos
  • Incremental sync only

Variables:

DATABASE_URL=auto
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires

Railway Service: daily-sync-worker

Métricas:

  • Tenants procesados: 41
  • Orders/día: ~8,000-12,000
  • Duración: ~2-3 horas

3. Product Sync Worker

Entry Point: start_product_sync.py Purpose: Sincronización completa de catálogo de productos

Características:

  • Schedule: 5:00 AM Argentina (diario)
  • Catálogo completo + SKUs
  • Todos los tenants activos
  • Full catalog refresh

Variables:

DATABASE_URL=auto
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires

Railway Service: product-sync-worker

Métricas:

  • Products sync: ~50,000 products
  • SKUs sync: ~200,000 SKUs
  • Duración: ~1-2 horas

4. Facebook Automation Worker

Entry Point: start_facebook_automation.py Purpose: Sincronización automática de audiencias de Facebook Ads

Características:

  • Schedule: Cada 48 horas (10:00 AM Argentina)
  • 3 sistemas: Kangoo, Septimo, DigitalFarma
  • 7 tenants activos
  • Database-driven configuration

Variables:

DATABASE_URL=auto
FACEBOOK_ACCESS_TOKEN=required
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires

Railway Service: facebook-automation

Métricas:

  • Customers sincronizados: 90,241
  • Ad accounts: 3
  • Duración: ~30-45 minutos

Documentación: Facebook Ads Integration


5. Analytics Worker

Entry Point: start_analytics_worker.py Purpose: Cálculos de RFM, CLV y Churn para todos los tenants

Características:

  • Procesamiento CDP analytics
  • RFM segmentation (11 segmentos)
  • CLV prediction (1y, 3y, 5y)
  • Churn risk analysis

Variables:

DATABASE_URL=auto
ANALYTICS_MODE=rfm/clv/churn/all
RUN_MODE=scheduler/immediate

Railway Service: analytics-worker

Métricas:

  • Customers procesados: 149,522+
  • Segmentos RFM: 11
  • Duración: ~45-60 minutos

6. Scheduler

Entry Point: start_scheduler.py Purpose: Coordinador central de tareas programadas

Características:

  • Coordina todos los workers programados
  • Monitoreo de ejecución
  • Error handling y alertas
  • Health checks

Variables:

DATABASE_URL=auto
SCHEDULER_MODE=production
TZ=America/Argentina/Buenos_Aires

Railway Service: scheduler

Schedules:

  • 05:00 AM → Product Sync
  • 06:00 AM → Daily Sync
  • 06:00 PM → Daily Sync
  • 10:00 AM (cada 48h) → Facebook Automation

7. Smart Sync Worker

Entry Point: main.py (WORKER_MODE=sync) Purpose: Sincronización inteligente con lógica avanzada

Características:

  • Legacy support
  • Historical sync capabilities
  • Intelligent retry logic
  • Multi-mode operation

Variables:

DATABASE_URL=auto
WORKER_MODE=sync
SYNC_MODE=auto

Railway Service: smart-sync-worker

Métricas:

  • Tenants: 32 activos
  • Performance: Optimizado
  • Reliability: 99.9%

8. Historical Sync Service

Entry Point: start_historical_sync.py Purpose: Procesamiento de datos históricos completos

Características:

  • Sync desde 2020 en adelante
  • Backfill de datos faltantes
  • Large dataset handling
  • Batch processing

Variables:

DATABASE_URL=auto
START_YEAR=2020
END_YEAR=2024
BATCH_SIZE=1000

Railway Service: historical-sync-service


9. Google Ads Worker

Entry Point: start_google_ads_worker.py Purpose: Google Ads Customer Match API completo

Características:

  • 108 vistas materializadas
  • Audiencias automáticas
  • SHA-256 hashing (privacidad)
  • Multi-tenant support

Variables:

DATABASE_URL=auto
GOOGLE_ADS_DEVELOPER_TOKEN=required
GOOGLE_ADS_CLIENT_ID=required
GOOGLE_ADS_CLIENT_SECRET=required
GOOGLE_ADS_REFRESH_TOKEN=required

Railway Service: google-ads-worker

Documentación: Google Ads Customer Match


10. VTEX Feed V3 Worker

Entry Point: start_vtex_feed_v3.py Purpose: Feed V3 VTEX activación e incremental

Características:

  • Legacy proven system
  • Feed v3 API support
  • Incremental updates
  • High performance

Variables:

DATABASE_URL=auto
VTEX_APP_KEY=required
VTEX_APP_TOKEN=required
FEED_MODE=activation/incremental

Railway Service: vtex-feed-v3-worker


11. Dynamic Collections Worker

Entry Point: start_dynamic_collections.py Purpose: Colecciones dinámicas basadas en patrones CDP

Características:

  • RFM-based collections
  • CLV-based collections
  • Automated updates
  • Multi-tenant

Variables:

DATABASE_URL=auto
COLLECTION_MODE=rfm/clv/churn/custom
UPDATE_FREQUENCY=daily

Railway Service: dynamic-collections-worker


12. Legacy Optimization Suite

Entry Point: main.py (WORKER_MODE=legacy_optimization) Purpose: Suite completo de optimizaciones legacy

Características:

  • Multi-tenant configuration
  • Global rate limiting
  • Enhanced identity resolution
  • Incremental sync optimizer
  • Error recovery system
  • VTEX API optimizer

Variables:

DATABASE_URL=auto
WORKER_MODE=legacy_optimization
RUN_MODE=setup/full/maintenance

Railway Service: legacy-optimization-suite

Status: 2/6 sistemas activos (Oct 2025)

Deployment: 28 de Septiembre 2025 - Emergency recovery completado exitosamente


Deployment Strategy

Railway Services

Cada worker es un servicio independiente en Railway:

# Listar servicios
railway service list

# Ver logs de un servicio
railway logs --service facebook-automation

# Restart un servicio
railway service restart vtex-sync-worker

Environment Variables

Variables compartidas por todos los workers:

# Database (automática en Railway)
DATABASE_URL=postgresql://...

# Timezone
TZ=America/Argentina/Buenos_Aires

# Modo de ejecución
RUN_MODE=scheduler/immediate

# Worker específico
WORKER_MODE=sync/analytics/facebook/etc

Health Checks

Cada worker expone un endpoint de health:

# Health check
curl https://vtex-sync-worker.railway.app/health

# Response
{
"status": "healthy",
"worker": "vtex-sync-worker",
"last_run": "2024-10-02T10:30:00Z",
"next_run": "2024-10-02T18:00:00Z"
}

Monitoring

Database Monitoring

-- Estado de workers
SELECT
worker_name,
last_execution,
next_execution,
status,
error_count
FROM worker_status
ORDER BY last_execution DESC;

Railway Logs

# Ver logs en tiempo real
railway logs --service facebook-automation --follow

# Filtrar errores
railway logs --service vtex-sync-worker | grep ERROR

# Últimas 100 líneas
railway logs --service daily-sync-worker --tail 100

Metrics Dashboard

Key metrics to monitor:

MetricTargetAlert Threshold
Sync Success RateMayor a 95%Menor a 90%
Average Sync TimeMenor a 2 horasMayor a 4 horas
Error RateMenor a 5%Mayor a 10%
API Rate Limit UsageMenor a 80%Mayor a 90%
Database ConnectionsMenor a 50Mayor a 80

Error Handling

Retry Strategy

Todos los workers implementan retry con backoff exponencial:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def sync_with_retry():
# Worker logic here
pass

Error Logging

-- Crear tabla de error logs
CREATE TABLE worker_errors (
id SERIAL PRIMARY KEY,
worker_name VARCHAR(100),
error_type VARCHAR(100),
error_message TEXT,
stack_trace TEXT,
tenant_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Query errores recientes
SELECT
worker_name,
error_type,
COUNT(*) as occurrences,
MAX(created_at) as last_occurrence
FROM worker_errors
WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY worker_name, error_type
ORDER BY occurrences DESC;

Performance Optimization

Database Connection Pooling

# connection_pool.py
import psycopg2.pool

connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)

Batch Processing

# Procesar en batches de 1,000
BATCH_SIZE = 1000

for i in range(0, len(data), BATCH_SIZE):
batch = data[i:i+BATCH_SIZE]
process_batch(batch)

Caching

from functools import lru_cache

@lru_cache(maxsize=100)
def get_tenant_config(tenant_id):
# Cachear configuraciones de tenant
return fetch_config(tenant_id)

Escalabilidad

Agregar Nuevo Worker

  1. Crear entry point en nerdistan-worker repo
  2. Configurar variables en Railway
  3. Deploy como nuevo servicio
  4. Actualizar documentación
# Ejemplo: Nuevo worker de email automation
railway service create email-automation-worker
railway variables --set "WORKER_MODE=email_automation"
railway up

Horizontal Scaling

Railway permite escalar horizontalmente:

# Escalar a múltiples instancias
railway service scale email-automation-worker --replicas 3

Troubleshooting

Worker no se ejecuta

Diagnóstico:

railway logs --service vtex-sync-worker | grep ERROR

Soluciones:

  1. Verificar variables de entorno
  2. Verificar DATABASE_URL
  3. Restart del servicio

Performance degradado

Diagnóstico:

-- Queries lentas
SELECT
pid,
now() - pg_stat_activity.query_start AS duration,
query
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY duration DESC;

Soluciones:

  1. Optimizar queries
  2. Agregar índices
  3. Aumentar recursos en Railway

Rate Limiting

Facebook/Google APIs:

  • Implementar exponential backoff
  • Respetar límites por hora
  • Distribuir requests en el tiempo

Próximos Pasos

Recursos