rm.manager의 소스 코드

from pathlib import (
    Path,
)

from bluebase.pf import (
    PfPageHeader,
    PfManager,
)
from .error import (
    RmInvalidRecordSizeError,
)
from .page import (
    RmPageHeader,
    RmPageBitmap,
)
from .file import (
    RmFileHeader,
    RmFile,
)


__all__ = (
    'RmManager',
)


[문서] class RmManager: """ RM component의 메인 manager 클래스입니다. Attributes: _pf_manager (PfManager): PF component의 manager. """
[문서] def __init__(self, pf_manager: PfManager | None = None, ) -> None: """ manager를 초기화합니다. Args: pf_manager (PfManager | None): PF component의 manager (기본값: `None`). `None` 이면 `PfManager` 를 생성합니다. """ self._pf_manager = pf_manager or PfManager()
[문서] def create_file(self, path: Path, record_size: int) -> RmFile: """ TODO: 설명 Args: path (Path): 대상 file의 경로. record_size (int): record의 크기. Returns: RmFile: 생성된 file. Raises: RmInvalidRecordSizeError: `record_size` 가 유효하지 않은 경우. """ max_record_size = self._pf_manager.page_size - sum(( PfPageHeader.SIZE, RmPageHeader.SIZE, RmPageBitmap.calc_size(1), )) if record_size < 1 or record_size > max_record_size: raise RmInvalidRecordSizeError( f"`record_size`는 1 이상, {max_record_size} 이하만 가능합니다 " f"(현재 입력: {record_size}).", ) pf_file = self._pf_manager.create_file(path) header_page = pf_file.allocate_page() with header_page.pinned(): header = RmFileHeader( page=header_page, offset=PfPageHeader.SIZE, limit=RmFileHeader.SIZE, ) header.record_size = record_size header.first_available_pid = RmFileHeader.NO_AVAILABLE_PAGE header_page.persist() rm_file = RmFile(pf_file, record_size) return rm_file
[문서] def destroy_file(self, path: Path) -> None: """ file을 삭제합니다. Args: path (Path): 대상 file의 경로. """ self._pf_manager.destroy_file(path)
[문서] def open_file(self, path: Path) -> RmFile: """ TODO: 설명 Args: path (Path): 대상 file의 경로. Returns: RmFile: 열린 file. """ pf_file = self._pf_manager.open_file(path) header_page = pf_file.get_page(RmFileHeader.PID) header = RmFileHeader( page=header_page, offset=PfPageHeader.SIZE, limit=RmFileHeader.SIZE, ) with header_page.pinned(): record_size = header.record_size rm_file = RmFile(pf_file, record_size) return rm_file
[문서] def close_file(self, path: Path) -> None: """ TODO: 설명 Args: path (Path): 대상 file의 경로. """ self._pf_manager.close_file(path)