CDP Processing API
Overview
El módulo de procesamiento CDP es el corazón del sistema, responsable de ejecutar análisis avanzados sobre los datos de clientes. Procesa más de 149,522 consumers en producción con análisis RFM, CLV y Churn.
🚀 Características Principales
- Procesamiento en batch de grandes volúmenes de datos
- Análisis RFM con 11 segmentos automáticos
- Cálculo de CLV predictivo
- Detección de churn con machine learning
- Cache inteligente para optimización de performance
📊 Estadísticas de Producción
| Tenant | Nombre | Consumers Procesados | Tiempo Promedio |
|---|---|---|---|
| 56 | Chelsea IO - Exit | 65,226 | 45s |
| 52 | Celada SA - BAPRO | 45,282 | 32s |
| 53 | JJ Deportes - BAPRO | 29,707 | 21s |
| 55 | Chelsea IO - Principal | 9,107 | 8s |
| 1 | Default | 200 | 2s |
API Endpoints
1. Process Tenant Data
Ejecuta análisis CDP completo o parcial para un tenant.
POST /api/v2/cdp/process/{tenantId}
Parámetros
| Parámetro | Tipo | Requerido | Descripción |
|---|---|---|---|
tenantId | integer | ✅ | ID del tenant a procesar |
Request Body
{
"analyses": ["rfm", "clv", "churn"],
"force": false,
"batch_size": 1000
}
| Campo | Tipo | Default | Descripción |
|---|---|---|---|
analyses | array | ["rfm"] | Análisis a ejecutar |
force | boolean | false | Forzar reprocesamiento |
batch_size | integer | 1000 | Tamaño del batch |
Response Success (200)
{
"success": true,
"message": "CDP processing completed successfully",
"tenant_id": 56,
"results": {
"rfm": {
"total_processed": 65226,
"segments": [
{
"segment": "Champions",
"count": 16125,
"percentage": 24.72
},
{
"segment": "Loyal Customers",
"count": 12500,
"percentage": 19.17
},
{
"segment": "Need Attention",
"count": 23010,
"percentage": 35.28
}
],
"processing_time": 45.2,
"errors": 0
},
"clv": {
"processed": 65226,
"avg_clv": 15000.50,
"processing_time": 28.3
},
"churn": {
"processed": 65226,
"at_risk": 4456,
"churn_rate": 0.068,
"processing_time": 31.5
}
},
"total_time": 105.0
}
Error Responses
404 Not Found
{
"success": false,
"error": "Tenant not found",
"details": "No tenant exists with ID: 999"
}
409 Conflict
{
"success": false,
"error": "Processing already in progress",
"details": "Another processing job is running for tenant 56"
}
2. Get Processing Status
Verifica el estado del procesamiento para un tenant.
GET /api/v2/cdp/process/status/{tenantId}
Response
{
"status": "processed",
"last_processed": "2024-09-16T19:16:45.665Z",
"hours_since_last_sync": 2,
"is_stale": false,
"records_processed": "45282",
"next_scheduled": "2024-09-17T01:00:00.000Z",
"processing_queue": {
"position": null,
"estimated_time": null
}
}
Estados Posibles
| Estado | Descripción |
|---|---|
never_processed | Nunca se ha procesado |
processing | Actualmente procesando |
processed | Procesamiento completado |
failed | Último procesamiento falló |
queued | En cola para procesamiento |
3. Schedule Processing
Programa procesamiento futuro para un tenant.
POST /api/v2/cdp/process/{tenantId}/schedule
Request Body
{
"schedule_at": "2024-09-17T03:00:00Z",
"recurring": true,
"frequency": "daily",
"analyses": ["rfm", "clv"]
}
4. Cancel Processing
Cancela un procesamiento en progreso.
DELETE /api/v2/cdp/process/{tenantId}/cancel
🔄 Flujo de Procesamiento
graph TD
A[Inicio] --> B{Tenant Válido?}
B -->|No| C[Error 404]
B -->|Sí| D{En Proceso?}
D -->|Sí| E[Error 409]
D -->|No| F[Obtener Consumers]
F --> G[Calcular RFM]
G --> H[Asignar Segmentos]
H --> I[Calcular CLV]
I --> J[Detectar Churn]
J --> K[Guardar en BD]
K --> L[Cache Results]
L --> M[Respuesta 200]
💾 Tablas de Base de Datos
cdp_rfm_analysis
CREATE TABLE cdp_rfm_analysis (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
email VARCHAR(255),
recency_days INTEGER,
frequency INTEGER,
monetary NUMERIC(15,2),
r_score INTEGER CHECK (r_score BETWEEN 1 AND 5),
f_score INTEGER CHECK (f_score BETWEEN 1 AND 5),
m_score INTEGER CHECK (m_score BETWEEN 1 AND 5),
rfm_score VARCHAR(3),
segment VARCHAR(50),
processed_at TIMESTAMP DEFAULT NOW()
);
🎯 Segmentos RFM
Los 11 segmentos automáticos:
-
Champions (555, 554, 544, 545, 454, 455, 445)
- Compraron recientemente, con frecuencia y gastan mucho
-
Loyal Customers (543, 444, 435, 355, 354, 345, 344, 335)
- Gastan buen dinero y responden a promociones
-
Potential Loyalists (553, 551, 552, 541, 542, 533, 532, 531, 452, 451)
- Clientes recientes con potencial
-
Recent Customers (512, 511, 422, 421, 412, 411, 311)
- Compraron recientemente por primera vez
-
Promising (525, 524, 523, 522, 521, 515, 514, 513, 425, 424, 413, 414, 415, 315, 314, 313)
- Compradores recientes con potencial de crecimiento
-
Need Attention (535, 534, 443, 434, 343, 334, 325, 324)
- Por encima del promedio pero en riesgo
-
About to Sleep (331, 321, 312, 221, 213, 231, 241, 251)
- Por debajo del promedio en todas las métricas
-
At Risk (255, 254, 245, 244, 253, 252, 243, 242, 235, 234, 225, 224, 153, 152, 145, 143, 142, 135, 134, 133, 125, 124)
- Gastaron mucho pero hace tiempo
-
Cannot Lose Them (155, 154, 144, 214, 215, 115, 114, 113)
- Grandes compradores que no han vuelto
-
Hibernating (332, 322, 231, 241, 151, 141, 131, 121, 112, 122, 123, 132, 211, 212, 222, 223, 232, 233)
- Última compra hace mucho tiempo
-
Lost (111)
- Menor puntuación en todas las métricas
🚀 Optimizaciones
Batch Processing
- Procesa en lotes de 1000 registros
- Uso de transacciones para consistencia
- Índices optimizados para búsquedas rápidas
Caching Strategy
- Cache de resultados por 1 hora
- Invalidación automática en nuevos procesamientos
- Cache warming para tenants principales
Performance Metrics
- Throughput: ~1,500 customers/segundo
- Memory usage: < 512MB para 100K customers
- CPU usage: ~60% en procesamiento peak
📝 Ejemplos de Uso
Python
import requests
# Procesar tenant
response = requests.post(
'https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/56',
json={'analyses': ['rfm', 'clv']}
)
if response.status_code == 200:
data = response.json()
print(f"Procesados: {data['results']['rfm']['total_processed']} customers")
JavaScript
const processTenan = async (tenantId) => {
const response = await fetch(
`https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/${tenantId}`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ analyses: ['rfm'] })
}
);
const data = await response.json();
console.log(`Procesados: ${data.results.rfm.total_processed} customers`);
};
processTenan(56);
CURL
# Procesar RFM para tenant 56
curl -X POST \
https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/56 \
-H "Content-Type: application/json" \
-d '{"analyses": ["rfm"]}'
# Verificar estado
curl https://nerdistan-datalake-production.up.railway.app/api/v2/cdp/process/status/56
🔧 Troubleshooting
Problema: Procesamiento lento
Solución:
- Reducir
batch_sizea 500 - Verificar índices en base de datos
- Procesar en horarios de baja carga
Problema: Error 409 Conflict
Solución:
- Esperar que termine el procesamiento actual
- Usar endpoint
/cancelsi está atascado - Verificar con
/statusel estado actual
Problema: Datos no actualizados
Solución:
- Usar parámetro
force: true - Verificar
is_staleen status - Revisar
last_processedtimestamp