System Architecture¶
High-Level Architecture¶
graph TB
subgraph "Scheduled Automation"
CRON1[Every 5 min<br/>sync:manager]
CRON2[Every 1 min<br/>Queue Processors]
CRON3[Daily<br/>Maintenance]
end
subgraph "Manual Triggers"
WEB[Web UI Actions]
API[API Endpoints]
CLI[Console Commands]
end
subgraph "Message Bus Layer"
PROFILE[SyncProfileCommand<br/>Master Router]
COMMANDS[39 Commands]
end
subgraph "Provider Handlers"
KPN[KPN SP16<br/>Handler]
VOD[Vodafone<br/>Handler]
TMO[T-Mobile<br/>Handler]
YIELD[Yielder<br/>Handler]
end
subgraph "Chain Execution"
CHAIN[MessageConsumedHandler<br/>Event Subscriber]
end
subgraph "Data Layer"
DB[(Database)]
QUEUE[Import Queues]
end
CRON1 --> PROFILE
CRON2 --> CLI
CRON3 --> CLI
WEB --> COMMANDS
API --> COMMANDS
CLI --> COMMANDS
PROFILE --> KPN
PROFILE --> VOD
PROFILE --> TMO
PROFILE --> YIELD
KPN --> CHAIN
VOD --> CHAIN
TMO --> CHAIN
YIELD --> CHAIN
CHAIN --> COMMANDS
COMMANDS --> DB
COMMANDS --> QUEUE
style CRON1 fill:#ffebee,stroke:#c62828
style PROFILE fill:#fff3e0,stroke:#e65100
style CHAIN fill:#e8f5e9,stroke:#2e7d32
Master Sync Flow¶
This is the most critical automation flow, running every 5 minutes:
sequenceDiagram
participant Cron
participant SyncManager
participant SyncService
participant SyncProfile
participant Router
participant Provider
participant Chain
Cron->>SyncManager: */5 * * * *
SyncManager->>SyncService: run()
SyncService->>SyncService: Query active profiles
loop For each profile
SyncService->>SyncProfile: dispatch()
SyncProfile->>Router: route by type
alt KPN_SP16
Router->>Provider: KpnSp16SyncCommand
else VODAFONE
Router->>Provider: VodafoneSyncCommand
else T_MOBILE
Router->>Provider: TMobileCalviSyncCommand
end
Provider->>Provider: Sync data
Provider->>Chain: DispatchTotalsCommand<br/>+ chain
Chain->>Chain: Process chain
loop For each chained command
Chain->>Provider: Execute command
end
end
Command Chaining Architecture¶
The chain execution system is what enables complex multi-step workflows:
graph LR
subgraph "Command with Chain"
CMD[Command A]
CHAIN["withChain([B, C, D])"]
CMD --> CHAIN
end
subgraph "Handler Execution"
HANDLE[Handler A Executes]
end
subgraph "Event System"
EVENT[WorkerMessageHandledEvent]
SUB[MessageConsumedHandler]
EVENT --> SUB
end
subgraph "Chain Processing"
CHECK{Has Chain?}
NEXT[Dispatch B]
NEXT2[Dispatch C]
NEXT3[Dispatch D]
CHECK -->|Yes| NEXT
NEXT --> NEXT2
NEXT2 --> NEXT3
end
CHAIN --> HANDLE
HANDLE --> EVENT
SUB --> CHECK
style CMD fill:#e3f2fd
style EVENT fill:#fff3e0
style CHECK fill:#ffebee
Key Point: Every command that completes triggers WorkerMessageHandledEvent, which MessageConsumedHandler uses to execute the next command in the chain.
Component Layers¶
Layer 1: Entry Points (4 types)¶
Layer 2: Message Bus (40 Commands → 36 Handlers)¶
graph LR
A[Console/Web/API] --> B[MessageBus]
B --> C{Command Type}
C -->|Sync| D[SyncProfileHandler]
C -->|Totals| E[CalculateTotalsHandler]
C -->|Import| F[Import Handlers]
C -->|CDR| G[CDR Handlers]
D --> H[Provider Handlers]
E --> I[Calculation Services]
F --> J[Import Services]
G --> K[CDR Services]
Layer 3: Event System (Chain Executor)¶
graph TB
A[Handler Completes] --> B[WorkerMessageHandledEvent]
B --> C[MessageConsumedHandler]
C --> D{Has Chain?}
D -->|Yes| E[Dispatch Next Command]
D -->|No| F[Remove from DB]
E --> A
Layer 4: Data Persistence¶
graph LR
A[Handlers] --> B[(sync_task)]
A --> C[(call_detail_record)]
A --> D[(totals)]
A --> E[(import_queue)]
A --> F[(subscription)]
style B fill:#e8f5e9
style C fill:#e1f5fe
style D fill:#fff3e0
style E fill:#fce4ec
style F fill:#f3e5f5
Critical Dependencies¶
Single Points of Failure¶
MessageConsumedHandler
Location: src/MessageBus/MessageConsumedHandler.php
Purpose: Enables ALL command chaining
Impact if fails: Complete automation breakdown - no chains execute
Mitigation: Monitor for exceptions, ensure Symfony Messenger is healthy
SyncProfileHandler
Location: src/MessageBus/AsynchronousHandler/SyncProfileHandler.php
Purpose: Routes all sync commands to provider-specific handlers
Impact if fails: All provider syncs stop
Mitigation: Monitor sync_task status, test routing logic
sync:manager Cron
Schedule: */5 * * * *
Purpose: Master orchestrator for all scheduled syncs
Impact if fails: No automated syncs execute
Mitigation: Monitor cron execution, alerts on failures
Provider Architecture¶
Each provider follows the same pattern:
graph TD
A[Provider Sync Command] --> B[Provider Handler]
B --> C[Import Service]
C --> D[Scraper/API Client]
D --> E[External Provider]
B --> F[DispatchTotalsCommand]
F --> G[Totals Handler]
G --> H[DispatchTotalsBackupCommand]
style A fill:#e3f2fd
style B fill:#e8f5e9
style C fill:#fff3e0
style D fill:#fce4ec
style E fill:#ffebee
Providers:
- KPN SP16/GRIP
- Vodafone/Odido
- T-Mobile Calvi
- KPN EEN
- Yielder
- Routit (mobile)
- Telfort (legacy)
Data Flow Example: KPN SP16 Sync¶
flowchart TD
Start[Cron triggers sync:manager] --> Create[Create SyncTask]
Create --> Dispatch[Dispatch SyncProfileCommand]
Dispatch --> Route{Route by type}
Route -->|KPN_SP16| KPN[KpnSp16SyncCommand]
KPN --> Handler[KpnSp16SyncHandler]
Handler --> Import[KpnSp16ImportCommand]
Import --> Puppet[PupeteerKpnSp16SyncCommand]
Puppet --> Scrape[Scrape KPN Portal]
Scrape --> Totals[DispatchTotalsCommand]
Totals --> Calc[CalculateTotalsCommand]
Calc --> Backup[DispatchTotalsBackupCommand]
Backup --> Done[Update SyncTask: COMPLETED]
style Start fill:#e3f2fd
style Route fill:#fff3e0
style Scrape fill:#ffebee
style Done fill:#e8f5e9
Database Operations:
- Read:
sync_profile,sync_task,kpn_import_queue - Write:
call_detail_record,subscription,invoice - Update:
totals,totals_backup,sync_task.status
Monitoring Points¶
| Component | What to Monitor | Alert Threshold |
|---|---|---|
| sync:manager | Cron execution | Miss 2 consecutive runs |
| MessageConsumedHandler | Exception rate | > 1% of messages |
| SyncProfileHandler | Routing errors | Any routing failure |
| sync_task table | Failed status | > 10% failure rate |
| Import queues | Queue depth | > 1000 pending |
| Totals calculations | Duration | > 5 minutes |
Extension Points¶
Want to add new automation? Hook in at these points:
- New Provider: Create handler implementing sync pattern
- New Command: Add to
src/MessageBus/Command/ - New Handler: Add to
src/MessageBus/AsynchronousHandler/ - New Workflow: Chain existing commands
- New Cron: Add console command, schedule in crontab
Next: Quick Start Guide →