# lib9p_util/nut_gen/nut.py - Low-level library for the NUT container format # # Copyright (C) 2025 Luke T. Shumaker # SPDX-License-Identifier: AGPL-3.0-or-later import contextlib import enum import typing type Buffer = bytes | bytearray | memoryview # pylint: disable=unused-variable __all__ = [ "file_id_string", "StartCode", "FrameFlag", "MainFlag", "UNKNOWN_MATCH_TIME", "StreamClass", "StreamFlag", "ColorSpace", "append_u8", "append_u32", "append_vu", "append_vs", "append_vb", "crc32_table", "crc32", "append_packet", ] file_id_string = b"nut/multimedia container\0" class StartCode(enum.Enum): main = bytes([ord("N"), ord("M"), 0x7A, 0x56, 0x1F, 0x5F, 0x04, 0xAD]) stream = bytes([ord("N"), ord("S"), 0x11, 0x40, 0x5B, 0xF2, 0xF9, 0xDB]) syncpoint = bytes([ord("N"), ord("K"), 0xE4, 0xAD, 0xEE, 0xCA, 0x45, 0x69]) index = bytes([ord("N"), ord("X"), 0xDD, 0x67, 0x2F, 0x23, 0xE6, 0x4E]) info = bytes([ord("N"), ord("I"), 0xAB, 0x68, 0xB5, 0x96, 0xBA, 0x78]) class FrameFlag(enum.IntFlag): KEY = 1 << 0 EOR = 1 << 1 _2 = 1 << 2 CODED_PTS = 1 << 3 STREAM_ID = 1 << 4 SIZE_MSB = 1 << 5 CHECKSUM = 1 << 6 RESERVED = 1 << 7 SM_DATA = 1 << 8 _9 = 1 << 9 HEADER_IDX = 1 << 10 MATCH_TIME = 1 << 11 CODED = 1 << 12 INVALID = 1 << 13 class MainFlag(enum.IntFlag): BROADCAST_MODE = 1 << 0 PIPE_MODE = 1 << 1 UNKNOWN_MATCH_TIME = 1 - (1 << 62) class StreamClass(enum.IntEnum): VIDEO = 0 AUDIO = 1 SUBTITLES = 2 USERDATA = 3 class StreamFlag(enum.IntFlag): FIXED_FPS = 1 << 1 class ColorSpace(enum.IntEnum): UNKNOWN = 0 ITU642_TRUNC = 1 ITU709_TRUNC = 2 ITU642_FULL = 17 ITU709_FILL = 18 # Basic fixed-length types def append_u8(buf: bytearray, val: int, /) -> None: assert 0 <= val < 2**8 assert val == 1 buf.append(val) def append_u32(buf: bytearray, val: int, /) -> None: assert 0 <= val < 2**32 # big-endian buf.append((val >> (8 * 3)) & 0xFF) buf.append((val >> (8 * 2)) & 0xFF) buf.append((val >> (8 * 1)) & 0xFF) buf.append((val >> (8 * 0)) & 0xFF) # Basic variable-length types def append_vu( buf: bytearray, val: int, /, minbytes: int = 1, maxbytes: int | None = None ) -> None: assert 0 <= val nbytes = max((val.bit_length() + 6) // 7, minbytes) if maxbytes is not None: assert nbytes <= maxbytes for i in range(nbytes): buf.append( (0 if i == nbytes - 1 else 0x80) | ((val >> ((nbytes - 1 - i) * 7)) & 0x7F) ) def append_vs(buf: bytearray, val: int) -> None: if val < 0: temp = ((-val) << 1) | 1 else: temp = val << 1 temp -= 1 append_vu(buf, temp) def append_vb(buf: bytearray, dat: Buffer) -> None: append_vu(buf, len(dat)) buf.extend(dat) _u32_max = 0xFFFFFFFF def _crc32_maketable(poly: int, width: int) -> list[int]: assert width.bit_count() == 1 assert width <= 8 poly &= _u32_max table = [] for i in range(1 << width): tmp = i << (32 - width) for _ in range(width): if tmp & 0x80000000: tmp = ((tmp << 1) & _u32_max) ^ poly else: tmp = (tmp << 1) & _u32_max table.append(tmp) return table def _crc32(poly: int, start: int, dat: Buffer, /, *, table_width: int = 4) -> int: table = _crc32_maketable(poly, table_width) crc = start while dat: crc ^= (dat[0] << 24) & _u32_max for i in range(8 // table_width): crc = ((crc << table_width) & _u32_max) ^ table[crc >> (32 - table_width)] dat = dat[1:] return crc def crc32_table(table_width: int) -> list[int]: # "Generator polynomial is 0x104C11DB7. Starting value is zero." return _crc32_maketable(0x104C11DB7, table_width) def crc32(dat: Buffer, /, *, table_width: int = 4) -> int: # "Generator polynomial is 0x104C11DB7. Starting value is zero." return _crc32(0x104C11DB7, 0, dat, table_width=table_width) @contextlib.contextmanager def append_packet(buf: bytearray, startcode: StartCode) -> typing.Iterator[bytearray]: start = len(buf) dat = bytearray() yield dat assert len(buf) == start buf.extend(startcode.value) append_vu(buf, len(dat) + 4) if len(dat) + 4 > 4096: append_u32(buf, crc32(buf[start:])) buf.extend(dat) append_u32(buf, crc32(dat))