Key Concepts¶
Core concepts you need to understand to work with the Expensis automation system.
MessageBus Architecture¶
What is the MessageBus?¶
The MessageBus is the central nervous system of Expensis automation. It enables asynchronous, decoupled communication between components.
sequenceDiagram
participant Trigger
participant MessageBus
participant Handler
participant Event
Trigger->>MessageBus: dispatch(Command)
MessageBus->>Handler: Route to handler
Handler->>Handler: Execute business logic
Handler-->>Event: Fire WorkerMessageHandledEvent
Event->>Event: MessageConsumedHandler checks chain
alt Has chained commands
Event->>MessageBus: dispatch(NextCommand)
end
Key Benefits¶
Non-blocking - UI doesn't wait for long operations Parallel execution - Multiple commands process simultaneously Decoupled - Components don't need to know about each other Testable - Easy to test handlers in isolation Scalable - Add workers to process more commands
Commands & Handlers¶
Command¶
An object representing an intent to do something.
class CalculateTotalsCommand
{
private int $customerId;
private \DateTime $cycleStartDate;
public function __construct(int $customerId, \DateTime $cycleStartDate)
{
$this->customerId = $customerId;
$this->cycleStartDate = $cycleStartDate;
}
// Getters...
}
Characteristics: - Immutable (data doesn't change after creation) - Contains only data, no business logic - Named as imperatives (CalculateTotals, SyncProfile)
Handler¶
A class that executes the command's intent.
#[AsMessageHandler]
class CalculateTotalsHandler
{
public function __invoke(CalculateTotalsCommand $command): void
{
// Business logic here
$this->calculateTotals(
$command->getCustomerId(),
$command->getCycleStartDate()
);
}
}
Characteristics:
- One handler per command (1:1 mapping)
- Contains all business logic
- Tagged with #[AsMessageHandler]
Command → Handler Flow¶
Controller/Service/Console
↓ creates
Command object
↓ dispatches to
MessageBus
↓ routes to
Handler
↓ executes
Business Logic
↓ may dispatch
More Commands (chain)
Command Chaining¶
What is a Chain?¶
A chain is a sequence of commands that execute one after another automatically.
$command = new SyncCommand(...);
$command->withChain([
new CalculateTotalsCommand(...), // Runs after sync
new BackupTotalsCommand(...), // Runs after calculate
new SaveStatusCommand(...) // Runs after backup
]);
$messageBus->dispatch($command);
How Chains Work¶
- Dispatch initial command
- Handler processes command
- Event
WorkerMessageHandledEventfires - MessageConsumedHandler detects chain
- Dispatch next command in chain
- Repeat until chain complete
graph LR
A[Command A] --> B[Handler A]
B --> C{Has Chain?}
C -->|Yes| D[Command B]
C -->|No| E[Done]
D --> F[Handler B]
F --> G{Has Chain?}
G -->|Yes| H[Command C]
G -->|No| E
H --> I[Handler C]
I --> E
style C fill:#ffebee
style G fill:#ffebee
Why Chaining?¶
Sequential execution guaranteed Error handling at each step Flexible workflows easy to modify Reusable handlers in different chains
The Critical Three¶
1. MessageConsumedHandler¶
Purpose: Enables ALL command chaining
How it works:
class MessageConsumedHandler implements EventSubscriberInterface
{
public function onWorkerMessageHandled(WorkerMessageHandledEvent $event)
{
$envelope = $event->getEnvelope();
$message = $envelope->getMessage();
// Check if command has a chain
if ($message instanceof ChainableCommandInterface) {
foreach ($message->getChain() as $chainedCommand) {
$this->messageBus->dispatch($chainedCommand);
}
}
}
}
Impact: If this breaks, ZERO workflows with chains work
2. SyncProfileHandler¶
Purpose: Master router for ALL provider syncs
How it works:
public function __invoke(SyncProfileCommand $command)
{
$syncType = $command->getSyncProfile()->getSyncType();
$syncCommand = match($syncType) {
SyncType::KPN_SP16 => new KpnSp16SyncCommand(...),
SyncType::VODAFONE => new VodafoneSyncCommand(...),
SyncType::T_MOBILE => new TMobileCalviSyncCommand(...),
// ...
};
$this->messageBus->dispatch($syncCommand);
}
Impact: If this breaks, ALL automated syncs stop
3. sync:manager Cron¶
Purpose: Triggers ALL automated syncs
Schedule: */5 * * * * (every 5 minutes)
What it does: 1. Query active SyncProfiles 2. Check which need syncing 3. Dispatch SyncProfileCommand for each 4. SyncProfileHandler routes to providers
Impact: If this stops, NO automation runs
Sync Profiles & Tasks¶
SyncProfile¶
Database record defining HOW to sync a provider for a customer.
| Field | Purpose |
|---|---|
customer_id |
Which customer |
sync_type |
Which provider (KPN_SP16, VODAFONE, etc.) |
active |
Is sync enabled? |
next_sync_date |
When to sync next |
credentials |
Encrypted login credentials |
SyncTask¶
Database record tracking a single sync execution.
| Field | Purpose |
|---|---|
sync_profile_id |
Which sync profile |
cycle_start_date |
Billing cycle being synced |
status |
pending, processing, completed, failed |
created_at |
When sync started |
updated_at |
Last status update |
error_message |
Error details if failed |
Status Flow¶
pending → processing → completed
pending → processing → failed
↓
invalid_credentials
invoice_not_yet_available
unknown_error
CDRs (Call Detail Records)¶
What is a CDR?¶
A record of a single telecom event: - Voice call: who, when, duration, cost - SMS: who, when, cost - Data session: amount, when, cost
CDR Structure¶
| Field | Example | Description |
|---|---|---|
customer_id |
123 | Which customer |
device_id |
456 | Which device/number |
call_date |
2025-01-15 10:30:00 | When it happened |
call_type |
voice | voice, sms, data |
destination |
+31612345678 | Called number |
duration |
120 | Seconds (for voice) |
cost |
0.50 | Cost in euros |
CDR Lifecycle¶
1. IMPORT → Sync from provider API
2. VALIDATE → Check data integrity
3. LINK → Connect to device/subscription
4. CALCULATE → Apply rate plans, compute costs
5. AGGREGATE → Sum into totals
6. BACKUP → Save for history
Totals Calculation¶
What are Totals?¶
Aggregated billing data for a customer and billing cycle.
Calculation Process¶
graph TD
A[Query all CDRs for period] --> B[Group by subscription]
B --> C[Group by call type]
C --> D[Apply rate plans]
D --> E[Calculate costs]
E --> F[Sum totals]
F --> G[Update totals table]
G --> H[Generate insights]
style D fill:#fff3e0
style E fill:#e8f5e9
Rate Plans¶
Pricing rules that determine CDR costs:
Rate Plan: "Business Mobile 100"
- First 100 minutes: €0.00
- Additional minutes: €0.10/min
- SMS: €0.05 each
- Data: €0.05/MB
Totals Breakdown¶
SELECT
subscription_id,
call_type,
SUM(duration) as total_duration,
SUM(cost) as total_cost,
COUNT(*) as call_count
FROM call_detail_record
WHERE customer_id = 123
AND call_date >= '2025-01-01'
AND call_date < '2025-02-01'
GROUP BY subscription_id, call_type;
Common Patterns¶
Router Pattern¶
Handler that forwards to other handlers based on context.
Example: DispatchTotalsHandler routes to: - CalculateTotalsCommand (standard) - CalculateTotalsRateplanCommand (with rate plans)
Chain Pattern¶
Command that triggers follow-up commands.
Example: SyncCommand chains: 1. Import data 2. Calculate totals 3. Backup totals 4. Save status
Queue Pattern¶
Commands added to queue for later processing.
Example: KPN SP16 import queue 1. Sync creates import queue record 2. Handler processes queue 3. Imports data 4. Marks queue record complete
Understanding Workflows¶
Workflow = Commands + Handlers + Chains¶
A workflow is the complete execution path from trigger to completion.
Example: KPN SP16 Sync Workflow
Trigger: sync:manager cron
↓
SyncProfileCommand
↓
SyncProfileHandler (router)
↓
KpnSp16SyncCommand
↓
KpnSp16SyncHandler
↓ (has chain)
KpnSp16ImportCommand
↓
KpnSp16ImportHandler
↓ (has chain)
DispatchTotalsCommand
↓
DispatchTotalsHandler (router)
↓
CalculateTotalsCommand
↓
CalculateTotalsHandler
↓ (has chain)
BillingInsightCommand
↓
Result: Complete sync with totals
See: Complete Workflows →
Error Handling¶
Error States¶
| State | Meaning | Action |
|---|---|---|
invalid_credentials |
Login failed | Update credentials |
invoice_not_yet_available |
Too early in cycle | Retry automatically |
timeout |
API slow/unavailable | Retry |
unknown_error |
Unexpected failure | Check logs, investigate |
Retry Logic¶
if ($syncTask->getRetries() < 5) {
// Retry
$syncTask->incrementRetries();
$this->dispatch($command);
} else {
// Give up, calculate with existing data
$this->calculateTotalsAnyway();
}
Next Steps¶
Now that you understand the key concepts:
- See them in action: Master Sync Workflow →
- Explore components: All Handlers →
- Debug issues: Troubleshooting Guide →
Questions? Check the FAQ →