Skip to content

CDR Processing Complete Lifecycle

Complexity: Very High (6-phase workflow)
Duration: 10-30 minutes (depending on volume)
Use Case: Routit mobile CDR import and billing

Most Complex Workflow

This is the longest workflow in the system with 6 distinct phases.


Complete 6-Phase Flow

flowchart TD
    subgraph Phase1["Phase 1: Request CDRs"]
        A[routit:cdr:sync<br/>Console Command] --> B[RoutitRequestCdrsCommand]
        B --> C[RoutitRequestCdrsHandler]
        C --> D{CDRs Ready?}
        D -->|No| E[Create request record<br/>Exit - retry later]
        D -->|Yes| F[RoutitDownloadCdrsCommand]
    end

    subgraph Phase2["Phase 2: Download CDRs"]
        F --> G[RoutitDownloadCdrsHandler]
        G --> H[Download CDR files<br/>from Routit API]
        H --> I[Store files locally<br/>/var/www/cdrs/routit/]
        I --> J[RoutitImportCdrsCommand]
    end

    subgraph Phase3["Phase 3: Import CDRs"]
        J --> K[RoutitImportCdrsHandler]
        K --> L[Parse CSV files]
        L --> M[Validate data]
        M --> N[Bulk insert CDRs]
        N --> O[Link to devices/customers]
    end

    subgraph Phase4["Phase 4: Generate Invoice"]
        O --> P[RoutitInvoiceCommand<br/>Triggered separately]
        P --> Q[RoutitInvoiceHandler]
        Q --> R[Generate Routit invoice]
        R --> S[CalculateTotalsCommand]
    end

    subgraph Phase5["Phase 5: Calculate Totals"]
        S --> T[CalculateTotalsHandler]
        T --> U[Calculate billing totals<br/>from all CDRs]
        U --> V[Update totals table]
        V --> W[BillingInsightTotalsCommand]
    end

    subgraph Phase6["Phase 6: Finalize"]
        W --> X[BillingInsightTotalsHandler]
        X --> Y[Generate billing insights]
        Y --> Z[SaveStatusCommand]
        Z --> AA[SaveStatusHandler]
        AA --> AB[Update sync task status]
    end

    style A fill:#e3f2fd
    style H fill:#ffebee
    style N fill:#fff3e0
    style U fill:#e8f5e9
    style AB fill:#c8e6c9

Phase 1: Request CDRs from Routit

Sequence Diagram

sequenceDiagram
    participant Cron
    participant Console as routit:cdr:sync
    participant Request as RoutitRequestCdrsCommand
    participant Handler as RoutitRequestCdrsHandler
    participant API as Routit API
    participant DB as Database

    Cron->>Console: Schedule or manual
    Console->>Request: Create command
    Request->>Handler: Handle
    Handler->>API: POST /api/cdr/request
    API-->>Handler: request_id
    Handler->>DB: Store request record
    Handler->>API: GET /api/cdr/status/{id}
    API-->>Handler: status: ready/pending

    alt CDRs Ready
        Handler->>Request: Dispatch RoutitDownloadCdrsCommand
    else CDRs Not Ready
        Handler->>DB: Mark as pending
        Handler-->>Console: Exit (retry later)
    end

Database Operations

-- Create import queue record
INSERT INTO import_queue
(sync_credential_id, date, type, locked, complete, cycle_start_date, created_at)
VALUES (1, NOW(), 'routit_cdr', 0, 0, '2025-01-01', NOW());

Code

// RoutitRequestCdrsHandler.php
public function __invoke(RoutitRequestCdrsCommand $command) {
    $requestId = $this->routitApi->requestCdrs(
        $customer,
        $billingCycle
    );

    $request = new RoutitCdrRequest();
    $request->setCustomer($customer);
    $request->setBillingCycle($billingCycle);
    $request->setRequestId($requestId);
    $request->setStatus('pending');

    $this->entityManager->persist($request);
    $this->entityManager->flush();

    // Check if ready immediately
    $status = $this->routitApi->checkStatus($requestId);

    if ($status === 'ready') {
        $downloadCommand = new RoutitDownloadCdrsCommand(
            $request->getId()
        );
        $this->commandBus->dispatch($downloadCommand);
    }
}

Phase 2: Download CDR Files

Flow

flowchart LR
    A[RoutitDownloadCdrsCommand] --> B[RoutitDownloadCdrsHandler]
    B --> C[Call Routit API]
    C --> D[Download ZIP file]
    D --> E[Extract CSV files]
    E --> F[Store in filesystem]
    F --> G[RoutitImportCdrsCommand]

    style D fill:#ffebee
    style F fill:#fff3e0

File Storage

/var/www/expensis/storage/routit/cdrs/
├── customer-123/
   └── 2025-01/
       ├── cdrs.csv         # Voice/data CDRs
       ├── sms.csv          # SMS CDRs
       └── metadata.json    # Import metadata

Code

// RoutitDownloadCdrsHandler.php
public function __invoke(RoutitDownloadCdrsCommand $command) {
    $request = $this->routitCdrRequestRepository->find($command->getRequestId());

    // Download ZIP from Routit API
    $zipContent = $this->routitApi->downloadCdrs($request->getRequestId());

    // Extract files
    $extractPath = $this->getStoragePath($request->getCustomer(), $request->getBillingCycle());
    $this->fileSystem->extractZip($zipContent, $extractPath);

    // Update request status
    $request->setStatus('downloaded');
    $request->setFilePath($extractPath);
    $this->entityManager->flush();

    // Trigger import
    $importCommand = new RoutitImportCdrsCommand($request->getId());
    $this->commandBus->dispatch($importCommand);
}

Phase 3: Import CDRs into Database

Import Process

flowchart TD
    A[RoutitImportCdrsCommand] --> B[RoutitImportCdrsHandler]
    B --> C[Read CSV files]
    C --> D[Parse each row]
    D --> E{Validate}
    E -->|Valid| F[Create CDR entity]
    E -->|Invalid| G[Log error, skip]
    F --> H[Match to device]
    H --> I[Match to customer]
    I --> J[Batch insert 1000 at a time]
    J --> K{More rows?}
    K -->|Yes| D
    K -->|No| L[Update import status]

    style C fill:#e3f2fd
    style J fill:#fff3e0
    style L fill:#e8f5e9

Bulk Insert Performance

// RoutitImportCdrsHandler.php
public function __invoke(RoutitImportCdrsCommand $command) {
    $request = $this->routitCdrRequestRepository->find($command->getRequestId());
    $filePath = $request->getFilePath();

    $csvReader = new CsvReader($filePath . '/cdrs.csv');
    $batchSize = 1000;
    $batch = [];
    $totalImported = 0;

    foreach ($csvReader->getRows() as $row) {
        try {
            $cdr = $this->createCdrFromRow($row);
            $batch[] = $cdr;

            if (count($batch) >= $batchSize) {
                $this->bulkInsert($batch);
                $totalImported += count($batch);
                $batch = [];

                // Clear memory
                $this->entityManager->clear();
                gc_collect_cycles();
            }
        } catch (ValidationException $e) {
            $this->logger->warning("Invalid CDR row: {$e->getMessage()}");
        }
    }

    // Insert remaining
    if (!empty($batch)) {
        $this->bulkInsert($batch);
        $totalImported += count($batch);
    }

    $request->setStatus('imported');
    $request->setTotalImported($totalImported);
    $this->entityManager->flush();
}

Database Impact

-- Typical import volume
INSERT INTO call_detail_record 
(customer_id, device_id, call_date, duration, cost, ...)
VALUES 
  (123, 456, '2025-01-01 10:00:00', 120, 0.50, ...),
  (123, 457, '2025-01-01 10:05:00', 300, 1.25, ...),
  ... -- Repeat 10,000-50,000 times

Performance: ~5,000-10,000 CDRs per second


Phase 4: Generate Routit Invoice

Trigger

# Console command (manual or scheduled)
php bin/console routit:invoice --customer=123 --cycle=2025-01-01

Flow

// RoutitInvoiceCommand (Console)
public function execute(InputInterface $input, OutputInterface $output) {
    $customerId = $input->getArgument('customer');
    $cycleStartDate = new \DateTime($input->getArgument('cycle'));

    $command = new RoutitInvoiceCommand($customerId, $cycleStartDate);
    $this->commandBus->dispatch($command);
}

// RoutitInvoiceHandler (MessageBus)
public function __invoke(RoutitInvoiceCommand $command) {
    // Generate invoice from imported CDRs
    $invoice = $this->invoiceGenerator->generate(
        $command->getCustomerId(),
        $command->getCycleStartDate()
    );

    $this->entityManager->persist($invoice);
    $this->entityManager->flush();

    // Trigger totals calculation
    $totalsCommand = new CalculateTotalsCommand(
        $command->getCustomerId(),
        $command->getCycleStartDate()
    );

    $totalsCommand->withChain([
        new BillingInsightTotalsCommand(
            $command->getCustomerId(),
            $command->getCycleStartDate()
        ),
        new SaveStatusCommand($command->getCustomerId())
    ]);

    $this->commandBus->dispatch($totalsCommand);
}

Phase 5: Calculate Billing Totals

Calculation Logic

graph TD
    A[CalculateTotalsCommand] --> B[CalculateTotalsHandler]
    B --> C[Query all CDRs for period]
    C --> D[Group by subscription/device]
    D --> E[Apply rate plans]
    E --> F[Calculate costs]
    F --> G[Aggregate totals]
    G --> H[Update totals table]

    style C fill:#e3f2fd
    style F fill:#fff3e0
    style H fill:#e8f5e9

Query Example

-- Calculate totals from CDRs
SELECT 
    c.customer_id,
    d.subscription_id,
    SUM(cdr.duration) as total_duration,
    SUM(cdr.cost) as total_cost,
    COUNT(*) as call_count,
    cdr.call_type
FROM call_detail_record cdr
JOIN device d ON cdr.device_id = d.id
JOIN customer c ON cdr.customer_id = c.id
WHERE cdr.call_date >= '2025-01-01'
  AND cdr.call_date < '2025-02-01'
  AND c.id = 123
GROUP BY c.customer_id, d.subscription_id, cdr.call_type;

Phase 6: Generate Billing Insights & Save

Billing Insights

// BillingInsightTotalsHandler.php
public function __invoke(BillingInsightTotalsCommand $command) {
    $totals = $this->totalsRepository->findByCustomerAndCycle(
        $command->getCustomerId(),
        $command->getCycleStartDate()
    );

    // Calculate insights
    $insights = [
        'total_cost' => $totals->getTotalCost(),
        'cost_vs_previous_month' => $this->calculateChange($totals),
        'top_spenders' => $this->getTopSpenders($totals),
        'cost_by_category' => $this->groupByCategory($totals),
        'anomalies' => $this->detectAnomalies($totals)
    ];

    $billingInsight = new BillingInsight();
    $billingInsight->setCustomer($command->getCustomerId());
    $billingInsight->setCycleStartDate($command->getCycleStartDate());
    $billingInsight->setInsights($insights);

    $this->entityManager->persist($billingInsight);
    $this->entityManager->flush();
}

Save Status

// SaveStatusHandler.php
public function __invoke(SaveStatusCommand $command) {
    $customer = $this->customerRepository->find($command->getCustomerId());
    $customer->setLastCalculatedAt(new \DateTime());
    $customer->setCalculationStatus('completed');

    $this->entityManager->flush();
}

Performance Metrics

Typical Execution Times

Phase Duration Bottleneck
1. Request 5-10 seconds API response time
2. Download 30-60 seconds Network bandwidth
3. Import 2-5 minutes Database insert speed
4. Invoice 10-30 seconds Invoice generation logic
5. Calculate 1-3 minutes Complex aggregation queries
6. Insights 10-20 seconds Analysis calculations
Total 5-10 minutes Import step (3) is bottleneck

Optimization Tips

// Use batch inserts
$this->entityManager->flush(); // Every 1000 records
$this->entityManager->clear(); // Clear identity map
gc_collect_cycles(); // Free memory

// Use raw SQL for bulk operations
$this->connection->executeStatement(
    "INSERT INTO call_detail_record (...) VALUES (...)",
    $batch
);

// Index optimization
CREATE INDEX idx_cdr_customer_date ON call_detail_record (customer_id, call_date);
CREATE INDEX idx_cdr_device ON call_detail_record (device_id);

Error Handling

Phase 1 Errors

catch (RoutitApiException $e) {
    // API unavailable
    $this->logger->error("Routit API error: {$e->getMessage()}");
    // Will retry on next scheduled run
}

Phase 2 Errors

catch (FileDownloadException $e) {
    // Download failed
    $request->setStatus('download_failed');
    $request->setError($e->getMessage());
    $this->entityManager->flush();
}

Phase 3 Errors

catch (CsvParseException $e) {
    // Malformed CSV
    $this->logger->error("CSV parse error at line {$e->getLine()}");
    $request->setStatus('import_failed');
    $request->setPartialImport(true);
    // Continue with successfully imported CDRs
}

Monitoring Queries

-- Check import progress
SELECT 
    request_id,
    customer_id,
    cycle_start_date as billing_cycle,
    locked,
    complete,
    created_at,
    updated_at
FROM import_queue
WHERE type = 'routit_cdr'
AND locked = 1
ORDER BY created_at DESC;

-- Check CDR volume
SELECT 
    customer_id,
    DATE(call_date) as date,
    COUNT(*) as cdr_count,
    SUM(cost) as total_cost
FROM call_detail_record
WHERE call_date >= '2025-01-01'
GROUP BY customer_id, DATE(call_date)
ORDER BY date DESC;


Key Takeaways

Complete CDR Lifecycle

  1. 6 distinct phases from request to insights
  2. Longest workflow in the system (5-10 minutes)
  3. Bulk processing handles 10k-50k CDRs
  4. Async execution via MessageBus
  5. Chain execution ensures proper sequencing
  6. Error recovery at each phase

Performance Considerations

  • Import phase is the bottleneck
  • Use batch inserts (1000 at a time)
  • Clear EntityManager regularly
  • Monitor memory usage
  • Index optimization is critical