ESP32 – MCP2515 CAN 통신

송신 TEST

#include <mcp_can.h>
#include <SPI.h>

// HSPI 핀 정의 (ESP32 HSPI: MOSI=13, MISO=12, SCK=14, CS=15 예시)
#define CAN_CS  15   // CS 핀 (원하는 핀으로 변경 가능)
#define CAN_INT 27   // MCP2515 INT 핀

// MCP_CAN 객체 (CS 핀, SPI 포트 지정)
MCP_CAN CAN0(CAN_CS);  

unsigned long lastSend = 0;

void setup() {
  Serial.begin(115200);
  delay(1000);

  // HSPI 사용 설정#include <mcp_can.h>
#include <SPI.h>

// HSPI 핀 정의 (ESP32 HSPI: MOSI=13, MISO=12, SCK=14, CS=15 예시)
#define CAN_CS  15   // CS 핀 (원하는 핀으로 변경 가능)
#define CAN_INT 27   // MCP2515 INT 핀

// MCP_CAN 객체 (CS 핀, SPI 포트 지정)
MCP_CAN CAN0(CAN_CS);  

unsigned long lastSend = 0;
volatile bool mcpIntFlag = false;

void IRAM_ATTR mcpIntISR() {
  // 단순 플래그만 세운다. ISR에서 무거운 작업 금지.
  mcpIntFlag = true;
}

void setup() {
  Serial.begin(115200);
  delay(1000);
  //SPIClass SPI(HSPI);
  pinMode(CAN_INT, INPUT_PULLUP);
  attachInterrupt(digitalPinToInterrupt(CAN_INT), mcpIntISR, FALLING);

  // HSPI 사용 설정
  SPI.begin(14, 12, 13, CAN_CS);  // SCK=14, MISO=12, MOSI=13, CS=15

  // MCP2515 초기화 (500kbps @ 8MHz crystal 기준)
  if (CAN0.begin(MCP_STDEXT, CAN_500KBPS, MCP_8MHZ) == CAN_OK) {
    Serial.println("MCP2515 Initialized Successfully!");
    CAN0.setMode(MCP_NORMAL);  
  } else {
    Serial.println("Error Initializing MCP2515...");
    while (1);
  }

  // Normal 모드 진입
}

void loop() {
  unsigned long now = millis();

  if (now - lastSend >= 100) {  // 100ms 주기
    lastSend = now;

    // 테스트용 데이터 프레임
    byte data[8] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88};

    // 표준 ID 0x100, 데이터 길이 8바이트 송신
    byte sndStat = CAN0.sendMsgBuf(0x100, 0, 8, data);

    if (sndStat == CAN_OK) {
      Serial.println("Message Sent Successfully!");
    } else {
      Serial.println("Error Sending Message...");
    }
  }
}

  SPI.begin(14, 12, 13, CAN_CS);  // SCK=14, MISO=12, MOSI=13, CS=15

  // MCP2515 초기화 (500kbps @ 16MHz crystal 기준)
  if (CAN0.begin(MCP_ANY, CAN_500KBPS, MCP_16MHZ) == CAN_OK) {
    Serial.println("MCP2515 Initialized Successfully!");
  } else {
    Serial.println("Error Initializing MCP2515...");
    while (1);
  }

  // Normal 모드 진입
  CAN0.setMode(MCP_NORMAL);  
  pinMode(CAN_INT, INPUT);
}

void loop() {
  unsigned long now = millis();

  if (now - lastSend >= 100) {  // 100ms 주기
    lastSend = now;

    // 테스트용 데이터 프레임
    byte data[8] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88};

    // 표준 ID 0x100, 데이터 길이 8바이트 송신
    byte sndStat = CAN0.sendMsgBuf(0x100, 0, 8, data);

    if (sndStat == CAN_OK) {
      Serial.println("Message Sent Successfully!");
    } else {
      Serial.println("Error Sending Message...");
    }
  }
}

결과

위 라이브러리로 진행 해보니 두개의 spi가 동작이 되지 않았다

따라서 아래 라이브러리로 다시 코딩함

#include <Arduino.h>
#include <SPI.h>
#include <mcp2515.h>
#include <SD.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/queue.h>
#include <freertos/semphr.h>

// GPIO 핀 정의
#define CAN_INT_PIN 27
#define LOGGING_CONTROL_PIN 17
#define LOGGING_STATUS_LED 16
#define SD_READY_LED 26

// HSPI 핀 (CAN)
#define HSPI_MISO 12
#define HSPI_MOSI 13
#define HSPI_SCLK 14
#define HSPI_CS 15

// VSPI 핀 (SD Card - 기본값)
#define VSPI_MISO 19
#define VSPI_MOSI 23
#define VSPI_SCLK 18
#define VSPI_CS 5

// 버퍼 설정
#define CAN_QUEUE_SIZE 1000
#define FILE_BUFFER_SIZE 8192
#define MAX_FILENAME_LEN 32

// CAN 메시지 구조체 (바이너리 저장용)
struct CANMessage {
    uint32_t timestamp;  // 밀리초 타임스탬프
    uint32_t id;         // CAN ID
    uint8_t dlc;         // 데이터 길이
    uint8_t data[8];     // 데이터
} __attribute__((packed));

// 전역 변수
SPIClass hspi(HSPI);
SPIClass vspi(VSPI);
MCP2515 mcp2515(HSPI_CS, 10000000, &hspi);  // CS핀, SPI클럭, SPI객체

QueueHandle_t canQueue;
SemaphoreHandle_t sdMutex;
TaskHandle_t canRxTaskHandle = NULL;
TaskHandle_t sdWriteTaskHandle = NULL;
TaskHandle_t controlTaskHandle = NULL;

volatile bool loggingEnabled = false;
volatile bool sdCardReady = false;
File logFile;
char currentFilename[MAX_FILENAME_LEN];
uint8_t fileBuffer[FILE_BUFFER_SIZE];
uint16_t bufferIndex = 0;
uint32_t fileCounter = 0;

// CAN 인터럽트 핸들러
void IRAM_ATTR canISR() {
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;
    if (canRxTaskHandle != NULL) {
        vTaskNotifyGiveFromISR(canRxTaskHandle, &xHigherPriorityTaskWoken);
        portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
    }
}

// 새 로그 파일 생성
bool createNewLogFile() {
    if (logFile) {
        logFile.flush();
        logFile.close();
        vTaskDelay(pdMS_TO_TICKS(10)); // SD 카드에 쓰기 완료 대기
    }
    
    char filename[MAX_FILENAME_LEN];
    snprintf(filename, MAX_FILENAME_LEN, "/canlog_%05lu.bin", fileCounter++);
    
    logFile = SD.open(filename, FILE_WRITE);
    
    if (!logFile) {
        Serial.printf("파일 생성 실패: %s\n", filename);
        return false;
    }
    
    strncpy(currentFilename, filename, MAX_FILENAME_LEN);
    Serial.printf("새 로그 파일 생성: %s\n", currentFilename);
    return true;
}

// 버퍼를 SD 카드에 플러시
bool flushBuffer() {
    if (bufferIndex == 0) return true;
    
    if (xSemaphoreTake(sdMutex, portMAX_DELAY) == pdTRUE) {
        if (logFile) {
            size_t written = logFile.write(fileBuffer, bufferIndex);
            logFile.flush();
            xSemaphoreGive(sdMutex);
            
            if (written != bufferIndex) {
                Serial.println("SD 쓰기 오류!");
                return false;
            }
            bufferIndex = 0;
            return true;
        }
        xSemaphoreGive(sdMutex);
    }
    return false;
}

// CAN 수신 태스크 (최고 우선순위)
void canRxTask(void *pvParameters) {
    struct can_frame frame;
    CANMessage msg;
    
    Serial.println("CAN 수신 태스크 시작");
    
    while (1) {
        // 인터럽트 대기
        ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
        
        // 모든 대기 중인 메시지 읽기
        while (mcp2515.readMessage(&frame) == MCP2515::ERROR_OK) {
            msg.timestamp = millis();
            msg.id = frame.can_id;
            msg.dlc = frame.can_dlc;
            memcpy(msg.data, frame.data, 8);
            
            // 큐가 가득 차면 오래된 메시지 버림 (오버플로우 방지)
            if (xQueueSend(canQueue, &msg, 0) != pdTRUE) {
                // 큐 가득 참 - 메시지 손실
                static uint32_t lastWarning = 0;
                if (millis() - lastWarning > 1000) {
                    Serial.println("경고: CAN 큐 오버플로우!");
                    lastWarning = millis();
                }
            }
        }
    }
}

// SD 카드 쓰기 태스크
void sdWriteTask(void *pvParameters) {
    CANMessage msg;
    
    Serial.println("SD 쓰기 태스크 시작");
    
    while (1) {
        // 큐에서 CAN 메시지 대기
        if (xQueueReceive(canQueue, &msg, pdMS_TO_TICKS(100)) == pdTRUE) {
            
            if (loggingEnabled && sdCardReady) {
                // 버퍼에 추가
                if (bufferIndex + sizeof(CANMessage) > FILE_BUFFER_SIZE) {
                    if (!flushBuffer()) {
                        digitalWrite(LOGGING_STATUS_LED, LOW);
                        continue;
                    }
                }
                
                memcpy(&fileBuffer[bufferIndex], &msg, sizeof(CANMessage));
                bufferIndex += sizeof(CANMessage);
                
                digitalWrite(LOGGING_STATUS_LED, HIGH);
            }
        } else {
            // 타임아웃 - 버퍼 플러시
            if (loggingEnabled && bufferIndex > 0) {
                flushBuffer();
            }
        }
    }
}

// 제어 태스크 (로깅 시작/정지 관리)
void controlTask(void *pvParameters) {
    bool lastLoggingState = false;
    
    Serial.println("제어 태스크 시작");
    
    while (1) {
        bool currentState = digitalRead(LOGGING_CONTROL_PIN);
        
        // 로깅 상태 변경 감지
        if (currentState != lastLoggingState) {
            if (currentState == HIGH && sdCardReady) {
                // 로깅 시작
                Serial.println("로깅 시작");
                
                if (xSemaphoreTake(sdMutex, pdMS_TO_TICKS(1000)) == pdTRUE) {
                    if (createNewLogFile()) {
                        loggingEnabled = true;
                        bufferIndex = 0;
                    }
                    xSemaphoreGive(sdMutex);
                }
            } else if (currentState == LOW && loggingEnabled) {
                // 로깅 정지
                Serial.println("로깅 정지");
                loggingEnabled = false;
                
                // 남은 데이터 플러시
                flushBuffer();
                
                if (xSemaphoreTake(sdMutex, pdMS_TO_TICKS(1000)) == pdTRUE) {
                    if (logFile) {
                        logFile.close();
                    }
                    xSemaphoreGive(sdMutex);
                }
                
                digitalWrite(LOGGING_STATUS_LED, LOW);
            }
            
            lastLoggingState = currentState;
        }
        
        vTaskDelay(pdMS_TO_TICKS(50));
    }
}

// SD 카드 모니터링 태스크
void sdMonitorTask(void *pvParameters) {
    Serial.println("SD 모니터 태스크 시작");
    
    while (1) {
        bool cardPresent = SD.begin(VSPI_CS, vspi);
        
        if (cardPresent != sdCardReady) {
            sdCardReady = cardPresent;
            digitalWrite(SD_READY_LED, sdCardReady ? HIGH : LOW);
            
            if (sdCardReady) {
                Serial.println("SD 카드 준비됨");
            } else {
                Serial.println("SD 카드 없음");
                if (loggingEnabled) {
                    loggingEnabled = false;
                    digitalWrite(LOGGING_STATUS_LED, LOW);
                }
            }
        }
        
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void setup() {
    Serial.begin(115200);
    delay(1000);
    Serial.println("\n\nESP32 CAN Logger 시작");
    
    // GPIO 초기화
    pinMode(LOGGING_CONTROL_PIN, INPUT);
    pinMode(LOGGING_STATUS_LED, OUTPUT);
    pinMode(SD_READY_LED, OUTPUT);
    pinMode(CAN_INT_PIN, INPUT_PULLUP);
    
    digitalWrite(LOGGING_STATUS_LED, LOW);
    digitalWrite(SD_READY_LED, LOW);
    
    // HSPI 초기화 (CAN)
    hspi.begin(HSPI_SCLK, HSPI_MISO, HSPI_MOSI, HSPI_CS);
    
    // VSPI 초기화 (SD Card)
    vspi.begin(VSPI_SCLK, VSPI_MISO, VSPI_MOSI, VSPI_CS);
    
    // MCP2515 초기화
    mcp2515.reset();
    mcp2515.setBitrate(CAN_1000KBPS, MCP_8MHZ);  // 8MHz 크리스탈 기준
    mcp2515.setNormalMode();
    
    Serial.println("MCP2515 초기화 완료 (1Mbps)");
    
    // SD 카드 초기화
    if (SD.begin(VSPI_CS, vspi)) {
        sdCardReady = true;
        digitalWrite(SD_READY_LED, HIGH);
        Serial.println("SD 카드 초기화 완료");
    } else {
        Serial.println("SD 카드 초기화 실패");
    }
    
    // RTOS 객체 생성
    canQueue = xQueueCreate(CAN_QUEUE_SIZE, sizeof(CANMessage));
    sdMutex = xSemaphoreCreateMutex();
    
    if (canQueue == NULL || sdMutex == NULL) {
        Serial.println("RTOS 객체 생성 실패!");
        while (1) delay(1000);
    }
    
    // 인터럽트 설정
    attachInterrupt(digitalPinToInterrupt(CAN_INT_PIN), canISR, FALLING);
    
    // 태스크 생성
    xTaskCreatePinnedToCore(
        canRxTask,
        "CAN_RX",
        4096,
        NULL,
        4,  // 최고 우선순위
        &canRxTaskHandle,
        1   // Core 1
    );
    
    xTaskCreatePinnedToCore(
        sdWriteTask,
        "SD_WRITE",
        12288,  // SD 작업에 더 큰 스택 필요
        NULL,
        3,  // 높은 우선순위
        &sdWriteTaskHandle,
        1   // Core 1
    );
    
    xTaskCreatePinnedToCore(
        controlTask,
        "CONTROL",
        8192,  // SD 파일 생성을 위해 큰 스택 필요
        NULL,
        2,  // 중간 우선순위
        &controlTaskHandle,
        0   // Core 0
    );
    
    xTaskCreatePinnedToCore(
        sdMonitorTask,
        "SD_MONITOR",
        4096,  // SD 초기화를 위해 더 큰 스택
        NULL,
        1,  // 낮은 우선순위
        NULL,
        0   // Core 0
    );
    
    Serial.println("모든 태스크 시작 완료");
    Serial.println("GPIO17: HIGH=로깅시작, LOW=로깅정지");
    Serial.println("GPIO16: 로깅 상태 LED");
    Serial.println("GPIO26: SD 카드 준비 LED");
}

void loop() {
    // FreeRTOS가 모든 것을 처리
    vTaskDelay(pdMS_TO_TICKS(1000));
    
    // 디버그 정보 출력
    static uint32_t lastPrint = 0;
    if (millis() - lastPrint > 5000) {
        Serial.printf("큐 사용: %d/%d, 로깅: %s, SD: %s\n",
            uxQueueMessagesWaiting(canQueue),
            CAN_QUEUE_SIZE,
            loggingEnabled ? "ON" : "OFF",
            sdCardReady ? "Ready" : "Not Ready"
        );
        lastPrint = millis();
    }
}

bin파일 분석 python코드

import struct
import csv
import sys
import os
from pathlib import Path

class CANLogConverter:
    """ESP32 CAN Logger 바이너리 파일을 CSV로 변환"""
    
    # 바이너리 구조체 형식: timestamp(4) + id(4) + dlc(1) + data(8) = 17 bytes
    RECORD_SIZE = 17
    RECORD_FORMAT = '<IIB8s'  # little-endian: uint32, uint32, uint8, 8 bytes
    
    def __init__(self, bin_file_path):
        self.bin_file_path = Path(bin_file_path)
        if not self.bin_file_path.exists():
            raise FileNotFoundError(f"파일을 찾을 수 없습니다: {bin_file_path}")
    
    def convert_to_csv(self, csv_file_path=None, include_header=True):
        """바이너리 파일을 CSV로 변환"""
        
        # CSV 파일 경로가 지정되지 않으면 자동 생성
        if csv_file_path is None:
            csv_file_path = self.bin_file_path.with_suffix('.csv')
        else:
            csv_file_path = Path(csv_file_path)
        
        record_count = 0
        error_count = 0
        
        try:
            with open(self.bin_file_path, 'rb') as bin_file:
                with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
                    csv_writer = csv.writer(csv_file)
                    
                    # 헤더 작성
                    if include_header:
                        csv_writer.writerow([
                            'Timestamp_ms',
                            'CAN_ID',
                            'CAN_ID_Hex',
                            'DLC',
                            'Data_Hex',
                            'D0', 'D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7'
                        ])
                    
                    # 바이너리 데이터 읽기 및 변환
                    while True:
                        data = bin_file.read(self.RECORD_SIZE)
                        
                        if len(data) == 0:
                            break  # 파일 끝
                        
                        if len(data) != self.RECORD_SIZE:
                            print(f"경고: 불완전한 레코드 (크기: {len(data)} bytes)")
                            error_count += 1
                            break
                        
                        try:
                            # 구조체 언팩
                            timestamp, can_id, dlc, data_bytes = struct.unpack(
                                self.RECORD_FORMAT, data
                            )
                            
                            # 데이터 바이트를 리스트로 변환 (DLC만큼만)
                            data_list = list(data_bytes[:dlc])
                            
                            # 나머지는 빈 값으로 채우기
                            data_list.extend([''] * (8 - len(data_list)))
                            
                            # 16진수 문자열 생성
                            data_hex = ' '.join(f'{b:02X}' for b in data_bytes[:dlc])
                            
                            # CSV 행 작성
                            csv_writer.writerow([
                                timestamp,
                                can_id,
                                f'0x{can_id:08X}',
                                dlc,
                                data_hex,
                                *data_list
                            ])
                            
                            record_count += 1
                            
                        except struct.error as e:
                            print(f"경고: 레코드 파싱 오류 - {e}")
                            error_count += 1
                            continue
        
        except Exception as e:
            print(f"오류 발생: {e}")
            return False
        
        print(f"\n변환 완료!")
        print(f"입력 파일: {self.bin_file_path}")
        print(f"출력 파일: {csv_file_path}")
        print(f"총 레코드: {record_count}")
        if error_count > 0:
            print(f"오류 레코드: {error_count}")
        
        return True
    
    def get_file_info(self):
        """바이너리 파일 정보 표시"""
        file_size = self.bin_file_path.stat().st_size
        record_count = file_size // self.RECORD_SIZE
        incomplete_bytes = file_size % self.RECORD_SIZE
        
        print(f"\n파일 정보:")
        print(f"  파일명: {self.bin_file_path.name}")
        print(f"  파일 크기: {file_size:,} bytes")
        print(f"  예상 레코드 수: {record_count:,}")
        if incomplete_bytes > 0:
            print(f"  불완전한 데이터: {incomplete_bytes} bytes")
        
        return record_count
    
    def preview_data(self, num_records=10):
        """처음 몇 개의 레코드 미리보기"""
        print(f"\n처음 {num_records}개 레코드 미리보기:")
        print("-" * 100)
        print(f"{'Timestamp':<12} {'CAN ID':<12} {'DLC':<5} {'Data'}")
        print("-" * 100)
        
        try:
            with open(self.bin_file_path, 'rb') as bin_file:
                for i in range(num_records):
                    data = bin_file.read(self.RECORD_SIZE)
                    
                    if len(data) != self.RECORD_SIZE:
                        break
                    
                    timestamp, can_id, dlc, data_bytes = struct.unpack(
                        self.RECORD_FORMAT, data
                    )
                    
                    data_hex = ' '.join(f'{b:02X}' for b in data_bytes[:dlc])
                    print(f"{timestamp:<12} 0x{can_id:08X}  {dlc:<5} {data_hex}")
        
        except Exception as e:
            print(f"오류: {e}")
        
        print("-" * 100)


def convert_multiple_files(input_pattern, output_dir=None):
    """여러 바이너리 파일을 일괄 변환"""
    import glob
    
    files = glob.glob(input_pattern)
    
    if not files:
        print(f"'{input_pattern}' 패턴과 일치하는 파일이 없습니다.")
        return
    
    print(f"\n{len(files)}개의 파일을 찾았습니다.")
    
    if output_dir:
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
    
    for i, file_path in enumerate(files, 1):
        print(f"\n[{i}/{len(files)}] 변환 중: {file_path}")
        
        try:
            converter = CANLogConverter(file_path)
            
            if output_dir:
                csv_path = output_path / Path(file_path).with_suffix('.csv').name
            else:
                csv_path = None
            
            converter.convert_to_csv(csv_path)
        
        except Exception as e:
            print(f"오류: {e}")


def main():
    """메인 함수"""
    if len(sys.argv) < 2:
        print("사용법:")
        print("  단일 파일 변환:")
        print("    python can_converter.py <input.bin> [output.csv]")
        print("\n  파일 정보 보기:")
        print("    python can_converter.py <input.bin> --info")
        print("\n  데이터 미리보기:")
        print("    python can_converter.py <input.bin> --preview [레코드수]")
        print("\n  여러 파일 일괄 변환:")
        print("    python can_converter.py --batch <패턴> [출력디렉토리]")
        print("\n예제:")
        print("    python can_converter.py canlog_00001.bin")
        print("    python can_converter.py canlog_00001.bin output.csv")
        print("    python can_converter.py canlog_00001.bin --info")
        print("    python can_converter.py canlog_00001.bin --preview 20")
        print("    python can_converter.py --batch \"canlog_*.bin\"")
        print("    python can_converter.py --batch \"canlog_*.bin\" output_folder")
        sys.exit(1)
    
    # 일괄 변환 모드
    if sys.argv[1] == '--batch':
        if len(sys.argv) < 3:
            print("오류: 파일 패턴을 지정하세요.")
            sys.exit(1)
        
        pattern = sys.argv[2]
        output_dir = sys.argv[3] if len(sys.argv) > 3 else None
        convert_multiple_files(pattern, output_dir)
        sys.exit(0)
    
    # 단일 파일 처리
    input_file = sys.argv[1]
    
    try:
        converter = CANLogConverter(input_file)
        
        # 정보 표시 모드
        if len(sys.argv) > 2 and sys.argv[2] == '--info':
            converter.get_file_info()
        
        # 미리보기 모드
        elif len(sys.argv) > 2 and sys.argv[2] == '--preview':
            num_records = int(sys.argv[3]) if len(sys.argv) > 3 else 10
            converter.get_file_info()
            converter.preview_data(num_records)
        
        # 변환 모드
        else:
            output_file = sys.argv[2] if len(sys.argv) > 2 else None
            converter.get_file_info()
            converter.convert_to_csv(output_file)
    
    except FileNotFoundError as e:
        print(f"오류: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"예상치 못한 오류: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

사용법

# 단일 파일 변환
python can_converter.py canlog_00001.bin

# 출력 파일명 지정
python can_converter.py canlog_00001.bin output.csv

# 파일 정보 보기
python can_converter.py canlog_00001.bin --info

# 데이터 미리보기 (20개 레코드)
python can_converter.py canlog_00001.bin --preview 20

# 여러 파일 일괄 변환
python can_converter.py --batch "canlog_*.bin"

# 특정 폴더로 출력
python can_converter.py --batch "canlog_*.bin" output_folder

dbc , asammdf 변환

import struct
import sys
from pathlib import Path
import numpy as np
from asammdf import MDF, Signal
import cantools
from collections import defaultdict
from datetime import datetime

class CANtoMDFConverter:
    """ESP32 CAN Logger 바이너리 파일을 DBC 파일을 참조하여 MDF로 변환"""
    
    # 바이너리 구조체 형식
    RECORD_SIZE = 17
    RECORD_FORMAT = '<IIB8s'  # little-endian: uint32, uint32, uint8, 8 bytes
    
    def __init__(self, bin_file_path, dbc_file_path):
        self.bin_file_path = Path(bin_file_path)
        self.dbc_file_path = Path(dbc_file_path)
        
        if not self.bin_file_path.exists():
            raise FileNotFoundError(f"바이너리 파일을 찾을 수 없습니다: {bin_file_path}")
        
        if not self.dbc_file_path.exists():
            raise FileNotFoundError(f"DBC 파일을 찾을 수 없습니다: {dbc_file_path}")
        
        # DBC 파일 로드
        print(f"DBC 파일 로딩: {self.dbc_file_path}")
        self.db = cantools.database.load_file(str(self.dbc_file_path))
        print(f"  - {len(self.db.messages)}개의 메시지 정의됨")
        print(f"  - {sum(len(msg.signals) for msg in self.db.messages)}개의 시그널 정의됨")
    
    def parse_binary_file(self):
        """바이너리 파일을 파싱하여 CAN 메시지 리스트 반환"""
        messages = []
        
        print(f"\n바이너리 파일 파싱: {self.bin_file_path}")
        
        with open(self.bin_file_path, 'rb') as f:
            record_count = 0
            error_count = 0
            
            while True:
                data = f.read(self.RECORD_SIZE)
                
                if len(data) == 0:
                    break
                
                if len(data) != self.RECORD_SIZE:
                    print(f"경고: 불완전한 레코드 (크기: {len(data)} bytes)")
                    error_count += 1
                    break
                
                try:
                    timestamp, can_id, dlc, data_bytes = struct.unpack(
                        self.RECORD_FORMAT, data
                    )
                    
                    messages.append({
                        'timestamp': timestamp / 1000.0,  # 밀리초를 초로 변환
                        'can_id': can_id,
                        'dlc': dlc,
                        'data': data_bytes[:dlc]
                    })
                    
                    record_count += 1
                    
                except struct.error as e:
                    print(f"경고: 레코드 파싱 오류 - {e}")
                    error_count += 1
                    continue
        
        print(f"  - 총 {record_count:,}개의 CAN 메시지 파싱됨")
        if error_count > 0:
            print(f"  - {error_count}개의 오류 발생")
        
        return messages
    
    def decode_messages(self, messages):
        """CAN 메시지를 DBC를 사용하여 디코딩"""
        print("\nCAN 메시지 디코딩 중...")
        
        # 시그널별 데이터 저장 (시그널명: {timestamps: [], values: []})
        signals_data = defaultdict(lambda: {'timestamps': [], 'values': [], 'unit': '', 'comment': ''})
        
        # 통계
        decoded_count = 0
        unknown_count = 0
        error_count = 0
        unknown_ids = set()
        
        for msg in messages:
            try:
                # DBC에서 메시지 정의 찾기
                db_message = self.db.get_message_by_frame_id(msg['can_id'])
                
                # 메시지 디코딩
                decoded = db_message.decode(msg['data'])
                
                # 각 시그널 데이터 저장
                for signal_name, value in decoded.items():
                    signals_data[signal_name]['timestamps'].append(msg['timestamp'])
                    signals_data[signal_name]['values'].append(value)
                    
                    # 시그널 메타데이터 저장 (첫 번째 만남에서만)
                    if not signals_data[signal_name]['unit']:
                        signal = db_message.get_signal_by_name(signal_name)
                        signals_data[signal_name]['unit'] = signal.unit or ''
                        signals_data[signal_name]['comment'] = signal.comment or ''
                
                decoded_count += 1
                
            except KeyError:
                # DBC에 정의되지 않은 CAN ID
                unknown_ids.add(msg['can_id'])
                unknown_count += 1
                
            except Exception as e:
                # 디코딩 오류
                error_count += 1
                if error_count <= 10:  # 처음 10개만 출력
                    print(f"  디코딩 오류 (ID: 0x{msg['can_id']:X}): {e}")
        
        print(f"  - {decoded_count:,}개의 메시지 디코딩 성공")
        print(f"  - {unknown_count:,}개의 메시지 (미정의 ID)")
        if unknown_ids:
            print(f"    미정의 CAN ID: {', '.join(f'0x{id:X}' for id in sorted(unknown_ids))}")
        if error_count > 0:
            print(f"  - {error_count}개의 디코딩 오류")
        print(f"  - 총 {len(signals_data)}개의 시그널 추출됨")
        
        return signals_data
    
    def create_mdf(self, signals_data, mdf_file_path=None, mdf_version='4.10'):
        """시그널 데이터로부터 MDF 파일 생성"""
        
        if mdf_file_path is None:
            mdf_file_path = self.bin_file_path.with_suffix('.mf4')
        else:
            mdf_file_path = Path(mdf_file_path)
        
        print(f"\nMDF 파일 생성 중: {mdf_file_path}")
        
        # MDF 객체 생성
        mdf = MDF(version=mdf_version)
        
        # 시그널을 MDF에 추가
        signal_count = 0
        
        for signal_name, data in signals_data.items():
            if not data['timestamps']:
                continue
            
            # numpy 배열로 변환
            timestamps = np.array(data['timestamps'], dtype=np.float64)
            values = np.array(data['values'], dtype=np.float64)
            
            # Signal 객체 생성
            signal = Signal(
                samples=values,
                timestamps=timestamps,
                name=signal_name,
                unit=data['unit'],
                comment=data['comment']
            )
            
            # MDF에 추가
            mdf.append([signal])
            signal_count += 1
            
            if signal_count % 100 == 0:
                print(f"  - {signal_count}개 시그널 추가됨...")
        
        print(f"  - 총 {signal_count}개의 시그널이 MDF에 추가됨")
        
        # 파일 저장
        print(f"MDF 파일 저장 중...")
        mdf.save(str(mdf_file_path), overwrite=True, compression=2)
        
        print(f"\n변환 완료!")
        print(f"  출력 파일: {mdf_file_path}")
        print(f"  파일 크기: {mdf_file_path.stat().st_size / 1024 / 1024:.2f} MB")
        
        return mdf_file_path
    
    def convert(self, mdf_file_path=None, mdf_version='4.10'):
        """전체 변환 프로세스 실행"""
        print("=" * 80)
        print("CAN Binary to MDF Converter")
        print("=" * 80)
        print(f"시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 1. 바이너리 파일 파싱
        messages = self.parse_binary_file()
        
        if not messages:
            print("오류: 파싱된 메시지가 없습니다.")
            return None
        
        # 2. 메시지 디코딩
        signals_data = self.decode_messages(messages)
        
        if not signals_data:
            print("오류: 디코딩된 시그널이 없습니다.")
            return None
        
        # 3. MDF 파일 생성
        output_file = self.create_mdf(signals_data, mdf_file_path, mdf_version)
        
        print()
        print("=" * 80)
        print(f"완료 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 80)
        
        return output_file
    
    def get_dbc_info(self):
        """DBC 파일 정보 출력"""
        print("\n" + "=" * 80)
        print("DBC 파일 정보")
        print("=" * 80)
        print(f"파일: {self.dbc_file_path}")
        print(f"\n총 {len(self.db.messages)}개의 메시지:")
        print("-" * 80)
        
        for msg in sorted(self.db.messages, key=lambda m: m.frame_id):
            print(f"  0x{msg.frame_id:03X} ({msg.frame_id:4d}) - {msg.name:30s} "
                  f"({len(msg.signals):2d} signals)")
            
            for sig in msg.signals:
                unit_str = f" [{sig.unit}]" if sig.unit else ""
                print(f"      └─ {sig.name}{unit_str}")
        
        print("=" * 80)


def batch_convert(bin_pattern, dbc_file, output_dir=None, mdf_version='4.10'):
    """여러 바이너리 파일을 일괄 변환"""
    import glob
    
    files = glob.glob(bin_pattern)
    
    if not files:
        print(f"'{bin_pattern}' 패턴과 일치하는 파일이 없습니다.")
        return
    
    print(f"\n{len(files)}개의 파일을 찾았습니다.")
    
    if output_dir:
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
    
    success_count = 0
    fail_count = 0
    
    for i, file_path in enumerate(files, 1):
        print(f"\n{'#' * 80}")
        print(f"[{i}/{len(files)}] 변환 중: {file_path}")
        print('#' * 80)
        
        try:
            converter = CANtoMDFConverter(file_path, dbc_file)
            
            if output_dir:
                mdf_path = output_path / Path(file_path).with_suffix('.mf4').name
            else:
                mdf_path = None
            
            converter.convert(mdf_path, mdf_version)
            success_count += 1
        
        except Exception as e:
            print(f"\n오류 발생: {e}")
            import traceback
            traceback.print_exc()
            fail_count += 1
    
    print(f"\n{'=' * 80}")
    print(f"일괄 변환 완료: 성공 {success_count}개, 실패 {fail_count}개")
    print('=' * 80)


def main():
    """메인 함수"""
    if len(sys.argv) < 3:
        print("사용법:")
        print("  단일 파일 변환:")
        print("    python can_to_mdf.py <input.bin> <database.dbc> [output.mf4] [--version=4.10]")
        print("\n  DBC 정보 보기:")
        print("    python can_to_mdf.py --info <database.dbc>")
        print("\n  여러 파일 일괄 변환:")
        print("    python can_to_mdf.py --batch <패턴> <database.dbc> [출력디렉토리] [--version=4.10]")
        print("\n예제:")
        print("    python can_to_mdf.py canlog_00001.bin vehicle.dbc")
        print("    python can_to_mdf.py canlog_00001.bin vehicle.dbc output.mf4")
        print("    python can_to_mdf.py canlog_00001.bin vehicle.dbc output.mf4 --version=4.10")
        print("    python can_to_mdf.py --info vehicle.dbc")
        print("    python can_to_mdf.py --batch \"canlog_*.bin\" vehicle.dbc")
        print("    python can_to_mdf.py --batch \"canlog_*.bin\" vehicle.dbc output_folder")
        print("\nMDF 버전: 3.30, 4.00, 4.10, 4.11 (기본값: 4.10)")
        print("\n필요한 라이브러리:")
        print("    pip install asammdf cantools numpy")
        sys.exit(1)
    
    # MDF 버전 파싱
    mdf_version = '4.10'
    for arg in sys.argv:
        if arg.startswith('--version='):
            mdf_version = arg.split('=')[1]
    
    # DBC 정보 보기 모드
    if sys.argv[1] == '--info':
        if len(sys.argv) < 3:
            print("오류: DBC 파일을 지정하세요.")
            sys.exit(1)
        
        try:
            # 더미 바이너리 파일로 컨버터 생성
            converter = CANtoMDFConverter(__file__, sys.argv[2])
            converter.get_dbc_info()
        except FileNotFoundError as e:
            print(f"오류: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"오류: {e}")
            import traceback
            traceback.print_exc()
            sys.exit(1)
        
        sys.exit(0)
    
    # 일괄 변환 모드
    if sys.argv[1] == '--batch':
        if len(sys.argv) < 4:
            print("오류: 파일 패턴과 DBC 파일을 지정하세요.")
            sys.exit(1)
        
        pattern = sys.argv[2]
        dbc_file = sys.argv[3]
        output_dir = sys.argv[4] if len(sys.argv) > 4 and not sys.argv[4].startswith('--') else None
        
        batch_convert(pattern, dbc_file, output_dir, mdf_version)
        sys.exit(0)
    
    # 단일 파일 변환
    input_file = sys.argv[1]
    dbc_file = sys.argv[2]
    output_file = None
    
    if len(sys.argv) > 3 and not sys.argv[3].startswith('--'):
        output_file = sys.argv[3]
    
    try:
        converter = CANtoMDFConverter(input_file, dbc_file)
        converter.convert(output_file, mdf_version)
    
    except FileNotFoundError as e:
        print(f"오류: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"예상치 못한 오류: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)


if __name__ == "__main__":
    main()

# 단일 파일 변환
python can_to_mdf.py canlog_00001.bin vehicle.dbc

# 출력 파일명 지정
python can_to_mdf.py canlog_00001.bin vehicle.dbc output.mf4

# MDF 버전 지정
python can_to_mdf.py canlog_00001.bin vehicle.dbc output.mf4 --version=4.10

# DBC 파일 정보 보기
python can_to_mdf.py --info vehicle.dbc

# 여러 파일 일괄 변환
python can_to_mdf.py --batch "canlog_*.bin" vehicle.dbc

# 특정 폴더로 출력
python can_to_mdf.py --batch "canlog_*.bin" vehicle.dbc output_folder

By byun

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다