Workers Architecture - 12 Specialized Workers
Sistema de workers especializados para procesamiento distribuido, sincronización y automatización del CDP Nerdistan.
Overview
Estado: ✅ 100% OPERACIONAL (actualizado 2 de Octubre 2025) Repositorio: nerdistan-worker Deployment: Railway Platform Total Workers: 12 workers especializados
Arquitectura General
graph TB
subgraph Railway["Railway Platform"]
W1[vtex-sync-worker]
W2[daily-sync-worker]
W3[product-sync-worker]
W4[facebook-automation]
W5[analytics-worker]
W6[scheduler]
W7[smart-sync-worker]
W8[historical-sync-service]
W9[google-ads-worker]
W10[vtex-feed-v3-worker]
W11[dynamic-collections-worker]
W12[legacy-optimization-suite]
end
subgraph Database["PostgreSQL Database"]
DB[(CDP Database)]
end
subgraph External["External APIs"]
VTEX[VTEX API]
FB[Facebook Ads API]
GA[Google Ads API]
end
W1 --> DB
W2 --> DB
W3 --> DB
W4 --> DB
W5 --> DB
W7 --> DB
W8 --> DB
W9 --> DB
W10 --> DB
W11 --> DB
W12 --> DB
W1 --> VTEX
W2 --> VTEX
W3 --> VTEX
W4 --> FB
W9 --> GA
W10 --> VTEX
W6 -.schedule.-> W2
W6 -.schedule.-> W3
W6 -.schedule.-> W4
W6 -.schedule.-> W5
Workers Especializados
1. VTEX Sync Worker
Entry Point: start_vtex_sync.py
Purpose: Sincronización directa VTEX para nuevos tenants y prioritarios
Características:
- Procesa tenants >= 100 (nuevos)
- Priority tenants: [20, 25, 28, 110]
- Data range: 2023 onwards
- Sync modes: initial / incremental
Variables:
DATABASE_URL=auto
SYNC_MODE=initial/incremental
DAYS_BACK=30
TZ=America/Argentina/Buenos_Aires
Railway Service: vtex-sync-worker
Métricas:
- Tenants activos: ~35
- Sync diario: ~5,000 orders
- Performance: menos de 10 min/tenant
2. Daily Sync Worker
Entry Point: start_daily_sync.py
Purpose: Sincronización programada diaria para todos los tenants activos
Características:
- Schedule: 6:00 AM y 6:00 PM Argentina
- Data range: Últimas 48 horas
- Todos los tenants activos
- Incremental sync only
Variables:
DATABASE_URL=auto
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires
Railway Service: daily-sync-worker
Métricas:
- Tenants procesados: 41
- Orders/día: ~8,000-12,000
- Duración: ~2-3 horas
3. Product Sync Worker
Entry Point: start_product_sync.py
Purpose: Sincronización completa de catálogo de productos
Características:
- Schedule: 5:00 AM Argentina (diario)
- Catálogo completo + SKUs
- Todos los tenants activos
- Full catalog refresh
Variables:
DATABASE_URL=auto
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires
Railway Service: product-sync-worker
Métricas:
- Products sync: ~50,000 products
- SKUs sync: ~200,000 SKUs
- Duración: ~1-2 horas
4. Facebook Automation Worker
Entry Point: start_facebook_automation.py
Purpose: Sincronización automática de audiencias de Facebook Ads
Características:
- Schedule: Cada 48 horas (10:00 AM Argentina)
- 3 sistemas: Kangoo, Septimo, DigitalFarma
- 7 tenants activos
- Database-driven configuration
Variables:
DATABASE_URL=auto
FACEBOOK_ACCESS_TOKEN=required
RUN_MODE=scheduler/immediate
TZ=America/Argentina/Buenos_Aires
Railway Service: facebook-automation
Métricas:
- Customers sincronizados: 90,241
- Ad accounts: 3
- Duración: ~30-45 minutos
Documentación: Facebook Ads Integration
5. Analytics Worker
Entry Point: start_analytics_worker.py
Purpose: Cálculos de RFM, CLV y Churn para todos los tenants
Características:
- Procesamiento CDP analytics
- RFM segmentation (11 segmentos)
- CLV prediction (1y, 3y, 5y)
- Churn risk analysis
Variables:
DATABASE_URL=auto
ANALYTICS_MODE=rfm/clv/churn/all
RUN_MODE=scheduler/immediate
Railway Service: analytics-worker
Métricas:
- Customers procesados: 149,522+
- Segmentos RFM: 11
- Duración: ~45-60 minutos
6. Scheduler
Entry Point: start_scheduler.py
Purpose: Coordinador central de tareas programadas
Características:
- Coordina todos los workers programados
- Monitoreo de ejecución
- Error handling y alertas
- Health checks
Variables:
DATABASE_URL=auto
SCHEDULER_MODE=production
TZ=America/Argentina/Buenos_Aires
Railway Service: scheduler
Schedules:
- 05:00 AM → Product Sync
- 06:00 AM → Daily Sync
- 06:00 PM → Daily Sync
- 10:00 AM (cada 48h) → Facebook Automation
7. Smart Sync Worker
Entry Point: main.py (WORKER_MODE=sync)
Purpose: Sincronización inteligente con lógica avanzada
Características:
- Legacy support
- Historical sync capabilities
- Intelligent retry logic
- Multi-mode operation
Variables:
DATABASE_URL=auto
WORKER_MODE=sync
SYNC_MODE=auto
Railway Service: smart-sync-worker
Métricas:
- Tenants: 32 activos
- Performance: Optimizado
- Reliability: 99.9%
8. Historical Sync Service
Entry Point: start_historical_sync.py
Purpose: Procesamiento de datos históricos completos
Características:
- Sync desde 2020 en adelante
- Backfill de datos faltantes
- Large dataset handling
- Batch processing
Variables:
DATABASE_URL=auto
START_YEAR=2020
END_YEAR=2024
BATCH_SIZE=1000
Railway Service: historical-sync-service
9. Google Ads Worker
Entry Point: start_google_ads_worker.py
Purpose: Google Ads Customer Match API completo
Características:
- 108 vistas materializadas
- Audiencias automáticas
- SHA-256 hashing (privacidad)
- Multi-tenant support
Variables:
DATABASE_URL=auto
GOOGLE_ADS_DEVELOPER_TOKEN=required
GOOGLE_ADS_CLIENT_ID=required
GOOGLE_ADS_CLIENT_SECRET=required
GOOGLE_ADS_REFRESH_TOKEN=required
Railway Service: google-ads-worker
Documentación: Google Ads Customer Match
10. VTEX Feed V3 Worker
Entry Point: start_vtex_feed_v3.py
Purpose: Feed V3 VTEX activación e incremental
Características:
- Legacy proven system
- Feed v3 API support
- Incremental updates
- High performance
Variables:
DATABASE_URL=auto
VTEX_APP_KEY=required
VTEX_APP_TOKEN=required
FEED_MODE=activation/incremental
Railway Service: vtex-feed-v3-worker
11. Dynamic Collections Worker
Entry Point: start_dynamic_collections.py
Purpose: Colecciones dinámicas basadas en patrones CDP
Características:
- RFM-based collections
- CLV-based collections
- Automated updates
- Multi-tenant
Variables:
DATABASE_URL=auto
COLLECTION_MODE=rfm/clv/churn/custom
UPDATE_FREQUENCY=daily
Railway Service: dynamic-collections-worker
12. Legacy Optimization Suite
Entry Point: main.py (WORKER_MODE=legacy_optimization)
Purpose: Suite completo de optimizaciones legacy
Características:
- Multi-tenant configuration
- Global rate limiting
- Enhanced identity resolution
- Incremental sync optimizer
- Error recovery system
- VTEX API optimizer
Variables:
DATABASE_URL=auto
WORKER_MODE=legacy_optimization
RUN_MODE=setup/full/maintenance
Railway Service: legacy-optimization-suite
Status: 2/6 sistemas activos (Oct 2025)
Deployment: 28 de Septiembre 2025 - Emergency recovery completado exitosamente
Deployment Strategy
Railway Services
Cada worker es un servicio independiente en Railway:
# Listar servicios
railway service list
# Ver logs de un servicio
railway logs --service facebook-automation
# Restart un servicio
railway service restart vtex-sync-worker
Environment Variables
Variables compartidas por todos los workers:
# Database (automática en Railway)
DATABASE_URL=postgresql://...
# Timezone
TZ=America/Argentina/Buenos_Aires
# Modo de ejecución
RUN_MODE=scheduler/immediate
# Worker específico
WORKER_MODE=sync/analytics/facebook/etc
Health Checks
Cada worker expone un endpoint de health:
# Health check
curl https://vtex-sync-worker.railway.app/health
# Response
{
"status": "healthy",
"worker": "vtex-sync-worker",
"last_run": "2024-10-02T10:30:00Z",
"next_run": "2024-10-02T18:00:00Z"
}
Monitoring
Database Monitoring
-- Estado de workers
SELECT
worker_name,
last_execution,
next_execution,
status,
error_count
FROM worker_status
ORDER BY last_execution DESC;
Railway Logs
# Ver logs en tiempo real
railway logs --service facebook-automation --follow
# Filtrar errores
railway logs --service vtex-sync-worker | grep ERROR
# Últimas 100 líneas
railway logs --service daily-sync-worker --tail 100
Metrics Dashboard
Key metrics to monitor:
| Metric | Target | Alert Threshold |
|---|---|---|
| Sync Success Rate | Mayor a 95% | Menor a 90% |
| Average Sync Time | Menor a 2 horas | Mayor a 4 horas |
| Error Rate | Menor a 5% | Mayor a 10% |
| API Rate Limit Usage | Menor a 80% | Mayor a 90% |
| Database Connections | Menor a 50 | Mayor a 80 |
Error Handling
Retry Strategy
Todos los workers implementan retry con backoff exponencial:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def sync_with_retry():
# Worker logic here
pass
Error Logging
-- Crear tabla de error logs
CREATE TABLE worker_errors (
id SERIAL PRIMARY KEY,
worker_name VARCHAR(100),
error_type VARCHAR(100),
error_message TEXT,
stack_trace TEXT,
tenant_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Query errores recientes
SELECT
worker_name,
error_type,
COUNT(*) as occurrences,
MAX(created_at) as last_occurrence
FROM worker_errors
WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY worker_name, error_type
ORDER BY occurrences DESC;
Performance Optimization
Database Connection Pooling
# connection_pool.py
import psycopg2.pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
Batch Processing
# Procesar en batches de 1,000
BATCH_SIZE = 1000
for i in range(0, len(data), BATCH_SIZE):
batch = data[i:i+BATCH_SIZE]
process_batch(batch)
Caching
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_config(tenant_id):
# Cachear configuraciones de tenant
return fetch_config(tenant_id)
Escalabilidad
Agregar Nuevo Worker
- Crear entry point en nerdistan-worker repo
- Configurar variables en Railway
- Deploy como nuevo servicio
- Actualizar documentación
# Ejemplo: Nuevo worker de email automation
railway service create email-automation-worker
railway variables --set "WORKER_MODE=email_automation"
railway up
Horizontal Scaling
Railway permite escalar horizontalmente:
# Escalar a múltiples instancias
railway service scale email-automation-worker --replicas 3
Troubleshooting
Worker no se ejecuta
Diagnóstico:
railway logs --service vtex-sync-worker | grep ERROR
Soluciones:
- Verificar variables de entorno
- Verificar DATABASE_URL
- Restart del servicio
Performance degradado
Diagnóstico:
-- Queries lentas
SELECT
pid,
now() - pg_stat_activity.query_start AS duration,
query
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY duration DESC;
Soluciones:
- Optimizar queries
- Agregar índices
- Aumentar recursos en Railway
Rate Limiting
Facebook/Google APIs:
- Implementar exponential backoff
- Respetar límites por hora
- Distribuir requests en el tiempo
Próximos Pasos
Recursos
- Repository: nerdistan-worker
- Railway Dashboard: Datalake Nerdistor
- Database: PostgreSQL en Railway
- Monitoring: Railway logs + Database queries