CDR Processing Complete Lifecycle¶
Complexity: Very High (6-phase workflow)
Duration: 10-30 minutes (depending on volume)
Use Case: Routit mobile CDR import and billing
Most Complex Workflow
This is the longest workflow in the system with 6 distinct phases.
Complete 6-Phase Flow¶
flowchart TD
subgraph Phase1["Phase 1: Request CDRs"]
A[routit:cdr:sync<br/>Console Command] --> B[RoutitRequestCdrsCommand]
B --> C[RoutitRequestCdrsHandler]
C --> D{CDRs Ready?}
D -->|No| E[Create request record<br/>Exit - retry later]
D -->|Yes| F[RoutitDownloadCdrsCommand]
end
subgraph Phase2["Phase 2: Download CDRs"]
F --> G[RoutitDownloadCdrsHandler]
G --> H[Download CDR files<br/>from Routit API]
H --> I[Store files locally<br/>/var/www/cdrs/routit/]
I --> J[RoutitImportCdrsCommand]
end
subgraph Phase3["Phase 3: Import CDRs"]
J --> K[RoutitImportCdrsHandler]
K --> L[Parse CSV files]
L --> M[Validate data]
M --> N[Bulk insert CDRs]
N --> O[Link to devices/customers]
end
subgraph Phase4["Phase 4: Generate Invoice"]
O --> P[RoutitInvoiceCommand<br/>Triggered separately]
P --> Q[RoutitInvoiceHandler]
Q --> R[Generate Routit invoice]
R --> S[CalculateTotalsCommand]
end
subgraph Phase5["Phase 5: Calculate Totals"]
S --> T[CalculateTotalsHandler]
T --> U[Calculate billing totals<br/>from all CDRs]
U --> V[Update totals table]
V --> W[BillingInsightTotalsCommand]
end
subgraph Phase6["Phase 6: Finalize"]
W --> X[BillingInsightTotalsHandler]
X --> Y[Generate billing insights]
Y --> Z[SaveStatusCommand]
Z --> AA[SaveStatusHandler]
AA --> AB[Update sync task status]
end
style A fill:#e3f2fd
style H fill:#ffebee
style N fill:#fff3e0
style U fill:#e8f5e9
style AB fill:#c8e6c9
Phase 1: Request CDRs from Routit¶
Sequence Diagram¶
sequenceDiagram
participant Cron
participant Console as routit:cdr:sync
participant Request as RoutitRequestCdrsCommand
participant Handler as RoutitRequestCdrsHandler
participant API as Routit API
participant DB as Database
Cron->>Console: Schedule or manual
Console->>Request: Create command
Request->>Handler: Handle
Handler->>API: POST /api/cdr/request
API-->>Handler: request_id
Handler->>DB: Store request record
Handler->>API: GET /api/cdr/status/{id}
API-->>Handler: status: ready/pending
alt CDRs Ready
Handler->>Request: Dispatch RoutitDownloadCdrsCommand
else CDRs Not Ready
Handler->>DB: Mark as pending
Handler-->>Console: Exit (retry later)
end
Database Operations¶
-- Create import queue record
INSERT INTO import_queue
(sync_credential_id, date, type, locked, complete, cycle_start_date, created_at)
VALUES (1, NOW(), 'routit_cdr', 0, 0, '2025-01-01', NOW());
Code¶
// RoutitRequestCdrsHandler.php
public function __invoke(RoutitRequestCdrsCommand $command) {
$requestId = $this->routitApi->requestCdrs(
$customer,
$billingCycle
);
$request = new RoutitCdrRequest();
$request->setCustomer($customer);
$request->setBillingCycle($billingCycle);
$request->setRequestId($requestId);
$request->setStatus('pending');
$this->entityManager->persist($request);
$this->entityManager->flush();
// Check if ready immediately
$status = $this->routitApi->checkStatus($requestId);
if ($status === 'ready') {
$downloadCommand = new RoutitDownloadCdrsCommand(
$request->getId()
);
$this->commandBus->dispatch($downloadCommand);
}
}
Phase 2: Download CDR Files¶
Flow¶
flowchart LR
A[RoutitDownloadCdrsCommand] --> B[RoutitDownloadCdrsHandler]
B --> C[Call Routit API]
C --> D[Download ZIP file]
D --> E[Extract CSV files]
E --> F[Store in filesystem]
F --> G[RoutitImportCdrsCommand]
style D fill:#ffebee
style F fill:#fff3e0
File Storage¶
/var/www/expensis/storage/routit/cdrs/
├── customer-123/
│ └── 2025-01/
│ ├── cdrs.csv # Voice/data CDRs
│ ├── sms.csv # SMS CDRs
│ └── metadata.json # Import metadata
Code¶
// RoutitDownloadCdrsHandler.php
public function __invoke(RoutitDownloadCdrsCommand $command) {
$request = $this->routitCdrRequestRepository->find($command->getRequestId());
// Download ZIP from Routit API
$zipContent = $this->routitApi->downloadCdrs($request->getRequestId());
// Extract files
$extractPath = $this->getStoragePath($request->getCustomer(), $request->getBillingCycle());
$this->fileSystem->extractZip($zipContent, $extractPath);
// Update request status
$request->setStatus('downloaded');
$request->setFilePath($extractPath);
$this->entityManager->flush();
// Trigger import
$importCommand = new RoutitImportCdrsCommand($request->getId());
$this->commandBus->dispatch($importCommand);
}
Phase 3: Import CDRs into Database¶
Import Process¶
flowchart TD
A[RoutitImportCdrsCommand] --> B[RoutitImportCdrsHandler]
B --> C[Read CSV files]
C --> D[Parse each row]
D --> E{Validate}
E -->|Valid| F[Create CDR entity]
E -->|Invalid| G[Log error, skip]
F --> H[Match to device]
H --> I[Match to customer]
I --> J[Batch insert 1000 at a time]
J --> K{More rows?}
K -->|Yes| D
K -->|No| L[Update import status]
style C fill:#e3f2fd
style J fill:#fff3e0
style L fill:#e8f5e9
Bulk Insert Performance¶
// RoutitImportCdrsHandler.php
public function __invoke(RoutitImportCdrsCommand $command) {
$request = $this->routitCdrRequestRepository->find($command->getRequestId());
$filePath = $request->getFilePath();
$csvReader = new CsvReader($filePath . '/cdrs.csv');
$batchSize = 1000;
$batch = [];
$totalImported = 0;
foreach ($csvReader->getRows() as $row) {
try {
$cdr = $this->createCdrFromRow($row);
$batch[] = $cdr;
if (count($batch) >= $batchSize) {
$this->bulkInsert($batch);
$totalImported += count($batch);
$batch = [];
// Clear memory
$this->entityManager->clear();
gc_collect_cycles();
}
} catch (ValidationException $e) {
$this->logger->warning("Invalid CDR row: {$e->getMessage()}");
}
}
// Insert remaining
if (!empty($batch)) {
$this->bulkInsert($batch);
$totalImported += count($batch);
}
$request->setStatus('imported');
$request->setTotalImported($totalImported);
$this->entityManager->flush();
}
Database Impact¶
-- Typical import volume
INSERT INTO call_detail_record
(customer_id, device_id, call_date, duration, cost, ...)
VALUES
(123, 456, '2025-01-01 10:00:00', 120, 0.50, ...),
(123, 457, '2025-01-01 10:05:00', 300, 1.25, ...),
... -- Repeat 10,000-50,000 times
Performance: ~5,000-10,000 CDRs per second
Phase 4: Generate Routit Invoice¶
Trigger¶
# Console command (manual or scheduled)
php bin/console routit:invoice --customer=123 --cycle=2025-01-01
Flow¶
// RoutitInvoiceCommand (Console)
public function execute(InputInterface $input, OutputInterface $output) {
$customerId = $input->getArgument('customer');
$cycleStartDate = new \DateTime($input->getArgument('cycle'));
$command = new RoutitInvoiceCommand($customerId, $cycleStartDate);
$this->commandBus->dispatch($command);
}
// RoutitInvoiceHandler (MessageBus)
public function __invoke(RoutitInvoiceCommand $command) {
// Generate invoice from imported CDRs
$invoice = $this->invoiceGenerator->generate(
$command->getCustomerId(),
$command->getCycleStartDate()
);
$this->entityManager->persist($invoice);
$this->entityManager->flush();
// Trigger totals calculation
$totalsCommand = new CalculateTotalsCommand(
$command->getCustomerId(),
$command->getCycleStartDate()
);
$totalsCommand->withChain([
new BillingInsightTotalsCommand(
$command->getCustomerId(),
$command->getCycleStartDate()
),
new SaveStatusCommand($command->getCustomerId())
]);
$this->commandBus->dispatch($totalsCommand);
}
Phase 5: Calculate Billing Totals¶
Calculation Logic¶
graph TD
A[CalculateTotalsCommand] --> B[CalculateTotalsHandler]
B --> C[Query all CDRs for period]
C --> D[Group by subscription/device]
D --> E[Apply rate plans]
E --> F[Calculate costs]
F --> G[Aggregate totals]
G --> H[Update totals table]
style C fill:#e3f2fd
style F fill:#fff3e0
style H fill:#e8f5e9
Query Example¶
-- Calculate totals from CDRs
SELECT
c.customer_id,
d.subscription_id,
SUM(cdr.duration) as total_duration,
SUM(cdr.cost) as total_cost,
COUNT(*) as call_count,
cdr.call_type
FROM call_detail_record cdr
JOIN device d ON cdr.device_id = d.id
JOIN customer c ON cdr.customer_id = c.id
WHERE cdr.call_date >= '2025-01-01'
AND cdr.call_date < '2025-02-01'
AND c.id = 123
GROUP BY c.customer_id, d.subscription_id, cdr.call_type;
Phase 6: Generate Billing Insights & Save¶
Billing Insights¶
// BillingInsightTotalsHandler.php
public function __invoke(BillingInsightTotalsCommand $command) {
$totals = $this->totalsRepository->findByCustomerAndCycle(
$command->getCustomerId(),
$command->getCycleStartDate()
);
// Calculate insights
$insights = [
'total_cost' => $totals->getTotalCost(),
'cost_vs_previous_month' => $this->calculateChange($totals),
'top_spenders' => $this->getTopSpenders($totals),
'cost_by_category' => $this->groupByCategory($totals),
'anomalies' => $this->detectAnomalies($totals)
];
$billingInsight = new BillingInsight();
$billingInsight->setCustomer($command->getCustomerId());
$billingInsight->setCycleStartDate($command->getCycleStartDate());
$billingInsight->setInsights($insights);
$this->entityManager->persist($billingInsight);
$this->entityManager->flush();
}
Save Status¶
// SaveStatusHandler.php
public function __invoke(SaveStatusCommand $command) {
$customer = $this->customerRepository->find($command->getCustomerId());
$customer->setLastCalculatedAt(new \DateTime());
$customer->setCalculationStatus('completed');
$this->entityManager->flush();
}
Performance Metrics¶
Typical Execution Times¶
| Phase | Duration | Bottleneck |
|---|---|---|
| 1. Request | 5-10 seconds | API response time |
| 2. Download | 30-60 seconds | Network bandwidth |
| 3. Import | 2-5 minutes | Database insert speed |
| 4. Invoice | 10-30 seconds | Invoice generation logic |
| 5. Calculate | 1-3 minutes | Complex aggregation queries |
| 6. Insights | 10-20 seconds | Analysis calculations |
| Total | 5-10 minutes | Import step (3) is bottleneck |
Optimization Tips¶
// Use batch inserts
$this->entityManager->flush(); // Every 1000 records
$this->entityManager->clear(); // Clear identity map
gc_collect_cycles(); // Free memory
// Use raw SQL for bulk operations
$this->connection->executeStatement(
"INSERT INTO call_detail_record (...) VALUES (...)",
$batch
);
// Index optimization
CREATE INDEX idx_cdr_customer_date ON call_detail_record (customer_id, call_date);
CREATE INDEX idx_cdr_device ON call_detail_record (device_id);
Error Handling¶
Phase 1 Errors¶
catch (RoutitApiException $e) {
// API unavailable
$this->logger->error("Routit API error: {$e->getMessage()}");
// Will retry on next scheduled run
}
Phase 2 Errors¶
catch (FileDownloadException $e) {
// Download failed
$request->setStatus('download_failed');
$request->setError($e->getMessage());
$this->entityManager->flush();
}
Phase 3 Errors¶
catch (CsvParseException $e) {
// Malformed CSV
$this->logger->error("CSV parse error at line {$e->getLine()}");
$request->setStatus('import_failed');
$request->setPartialImport(true);
// Continue with successfully imported CDRs
}
Monitoring Queries¶
-- Check import progress
SELECT
request_id,
customer_id,
cycle_start_date as billing_cycle,
locked,
complete,
created_at,
updated_at
FROM import_queue
WHERE type = 'routit_cdr'
AND locked = 1
ORDER BY created_at DESC;
-- Check CDR volume
SELECT
customer_id,
DATE(call_date) as date,
COUNT(*) as cdr_count,
SUM(cost) as total_cost
FROM call_detail_record
WHERE call_date >= '2025-01-01'
GROUP BY customer_id, DATE(call_date)
ORDER BY date DESC;
Related Workflows¶
Key Takeaways¶
Complete CDR Lifecycle
- 6 distinct phases from request to insights
- Longest workflow in the system (5-10 minutes)
- Bulk processing handles 10k-50k CDRs
- Async execution via MessageBus
- Chain execution ensures proper sequencing
- Error recovery at each phase
Performance Considerations
- Import phase is the bottleneck
- Use batch inserts (1000 at a time)
- Clear EntityManager regularly
- Monitor memory usage
- Index optimization is critical