Facebook Ads Integration - Arquitectura
Diagrama de Arquitectura
flowchart TB
subgraph Railway["Railway Platform"]
FB[Facebook Automation Worker]
DB[(PostgreSQL Database)]
end
subgraph CDP["CDP System"]
CFG[facebook_configurations]
CUST[customers_master]
RFM[cdp_rfm_analysis]
CLV[cdp_clv_analysis]
end
subgraph Facebook["Facebook Platform"]
API[Facebook Ads API]
AUD1[Custom Audience - Kangoo]
AUD2[Custom Audience - Septimo]
AUD3[Custom Audience - DigitalFarma]
end
FB -->|Query Config| CFG
FB -->|Extract Customers| CUST
FB -->|Get Segments| RFM
FB -->|Get CLV| CLV
FB -->|Sync Audiences| API
API --> AUD1
API --> AUD2
API --> AUD3
CFG --> DB
CUST --> DB
RFM --> DB
CLV --> DB
Componentes Principales
1. Worker Service (Railway)
Entry Point: start_facebook_automation.py
# Flujo principal del worker
1. Cargar configuraciones desde DB
2. Para cada sistema:
a. Obtener lista de tenants
b. Extraer clientes de cada tenant
c. Consolidar audiencias
d. Hashear datos (SHA-256)
e. Sync via Facebook API
f. Validar y registrar resultados
Variables de Entorno:
DATABASE_URL=postgresql://...
FACEBOOK_ACCESS_TOKEN=EAAJs4xreU3IBPp7...
WORKER_MODE=facebook
RUN_MODE=scheduler
TZ=America/Argentina/Buenos_Aires
2. Base de Datos
Tabla: cdp.facebook_configurations
CREATE TABLE cdp.facebook_configurations (
id SERIAL PRIMARY KEY,
system_name VARCHAR(100) UNIQUE NOT NULL,
ad_account_id VARCHAR(50) NOT NULL,
tenant_ids INTEGER[] NOT NULL,
custom_audience_id VARCHAR(50),
is_active BOOLEAN DEFAULT true,
schedule_hours INTEGER DEFAULT 48,
last_sync TIMESTAMP WITH TIME ZONE,
next_sync TIMESTAMP WITH TIME ZONE,
total_customers INTEGER DEFAULT 0,
sync_status VARCHAR(50),
error_log TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
Configuraciones Actuales:
| system_name | ad_account_id | tenant_ids | custom_audience_id |
|---|---|---|---|
| kangoo_system | act_323379512251780 | [20, 25] | - |
| septimo_system | act_881526052911252 | [100, 107, 112, 118] | - |
| digitalfarma_system | act_2850528471874859 | [28] | - |
3. Facebook Ads API Integration
API Version: v18.0
Endpoint Base: https://graph.facebook.com/v18.0/
Custom Audiences API
# Crear Custom Audience
POST /{ad_account_id}/customaudiences
{
"name": "CDP - Kangoo System - 2024-10",
"subtype": "CUSTOM",
"description": "Audiencia sincronizada desde Nerdistan CDP",
"customer_file_source": "USER_PROVIDED_ONLY"
}
# Agregar usuarios a Audience
POST /{custom_audience_id}/users
{
"payload": {
"schema": ["EMAIL", "PHONE", "FN", "LN"],
"data": [
["hash_email", "hash_phone", "hash_firstname", "hash_lastname"]
]
}
}
Hashing de Datos (SHA-256)
import hashlib
def hash_data(value: str) -> str:
"""Hash data con SHA-256 según especificación de Facebook"""
if not value:
return ""
# Normalizar
normalized = value.lower().strip()
# Hashear
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
4. Scheduler Sistema
Cron Schedule: Cada 48 horas (10:00 AM Argentina)
# Scheduler logic en start_facebook_automation.py
import schedule
import time
from datetime import datetime, timedelta
def schedule_sync():
"""Programa sincronización cada 48 horas"""
schedule.every(48).hours.do(run_facebook_sync)
while True:
schedule.run_pending()
time.sleep(3600) # Check cada hora
Flujo de Sincronización
Paso 1: Cargar Configuraciones
def load_active_configurations():
"""Cargar sistemas activos desde DB"""
query = """
SELECT
system_name,
ad_account_id,
tenant_ids,
custom_audience_id,
schedule_hours
FROM cdp.facebook_configurations
WHERE is_active = true
ORDER BY system_name
"""
return execute_query(query)
Paso 2: Extraer Clientes
def get_customers_for_system(tenant_ids: list):
"""Extraer clientes de múltiples tenants"""
query = """
SELECT DISTINCT
c.email,
c.phone,
c.firstname,
c.lastname,
c.tenant_id
FROM customers_master c
INNER JOIN cdp_rfm_analysis r ON c.id = r.customer_id
WHERE c.tenant_id = ANY(%s)
AND c.email IS NOT NULL
AND c.email != ''
AND r.rfm_segment NOT IN ('Lost', 'Hibernating')
ORDER BY c.tenant_id, c.email
"""
return execute_query(query, (tenant_ids,))
Paso 3: Preparar Payload
def prepare_facebook_payload(customers: list):
"""Preparar datos para Facebook API"""
data = []
for customer in customers:
row = [
hash_data(customer['email']),
hash_data(customer['phone']),
hash_data(customer['firstname']),
hash_data(customer['lastname'])
]
data.append(row)
return {
"schema": ["EMAIL", "PHONE", "FN", "LN"],
"data": data
}
Paso 4: Sync a Facebook
def sync_to_facebook(ad_account_id, payload, system_name):
"""Sincronizar audiencia a Facebook"""
# 1. Crear o actualizar Custom Audience
audience_id = create_or_update_audience(
ad_account_id=ad_account_id,
name=f"CDP - {system_name} - {datetime.now().strftime('%Y-%m')}",
description="Audiencia sincronizada desde Nerdistan CDP"
)
# 2. Agregar usuarios (batches de 10,000)
batch_size = 10000
total_added = 0
for i in range(0, len(payload['data']), batch_size):
batch = payload['data'][i:i+batch_size]
response = add_users_to_audience(
audience_id=audience_id,
schema=payload['schema'],
data=batch
)
total_added += response.get('num_received', 0)
return {
'audience_id': audience_id,
'total_added': total_added
}
Paso 5: Actualizar Status
def update_sync_status(system_name, result):
"""Actualizar estado de sincronización en DB"""
query = """
UPDATE cdp.facebook_configurations
SET
last_sync = CURRENT_TIMESTAMP,
next_sync = CURRENT_TIMESTAMP + INTERVAL '48 hours',
total_customers = %s,
custom_audience_id = %s,
sync_status = 'success',
error_log = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE system_name = %s
"""
execute_query(query, (
result['total_added'],
result['audience_id'],
system_name
))
Error Handling
Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def facebook_api_call_with_retry(endpoint, payload):
"""API call con retry automático"""
try:
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API call failed: {e}")
raise
Error Logging
def log_sync_error(system_name, error):
"""Registrar errores en DB"""
query = """
UPDATE cdp.facebook_configurations
SET
sync_status = 'failed',
error_log = %s,
updated_at = CURRENT_TIMESTAMP
WHERE system_name = %s
"""
execute_query(query, (str(error), system_name))
Seguridad y Privacidad
1. Hashing de Datos Personales
Todos los datos personales (email, teléfono, nombre) se hashean con SHA-256 antes de enviar a Facebook.
2. Access Token Management
# Token con permisos específicos
REQUIRED_PERMISSIONS = [
'ads_management',
'ads_read',
'business_management'
]
# Token de larga duración (60 días)
# Renovar antes de expiración
3. Data Privacy
- Solo se sincronizan clientes activos (RFM segments excluye Lost/Hibernating)
- Emails validados (no vacíos, formato correcto)
- Cumplimiento GDPR/LGPD
- Opt-out respetado
Performance
Optimizaciones Implementadas
- Batch Processing: 10,000 usuarios por request
- Database Indexing: Indices en tenant_id, email
- Async Processing: Procesos paralelos por sistema
- Caching: Configuraciones cacheadas en memoria
Métricas Esperadas
| Métrica | Valor |
|---|---|
| Tiempo de Sync (50k usuarios) | ~5-10 minutos |
| API Rate Limit | 200 requests/hora |
| Batch Size | 10,000 usuarios |
| Retry Attempts | 3 max |
Escalabilidad
Agregar Nuevos Sistemas
-- Insertar nueva configuración
INSERT INTO cdp.facebook_configurations (
system_name,
ad_account_id,
tenant_ids,
is_active,
schedule_hours
) VALUES (
'nuevo_sistema',
'act_123456789',
ARRAY[30, 31, 32],
true,
48
);
El worker automáticamente detectará y procesará la nueva configuración en el próximo ciclo.
Multi-Region Support
Sistema preparado para soportar múltiples regiones:
- Configuración por timezone
- Custom schedules por sistema
- Region-specific ad accounts
Monitoring Points
- Database Logs:
cdp.facebook_configurations.sync_status - Worker Logs: Railway service logs
- Facebook Insights: Custom Audience size & reach
- Error Alerts: Slack/Email notifications (futuro)