# lib9p_util/nut_gen/app.py - Application-specific code for NUT file generation # # Copyright (C) 2025 Luke T. Shumaker # SPDX-License-Identifier: AGPL-3.0-or-later from . import nut # pylint: disable=unused-variable __all__ = [ "append_intro", "State", "fb_w", "fb_h", "fb_size", ] fb_w = 720 // 2 fb_h = 576 fb_size = fb_w * fb_h sample_w = 2 sample_h = 1 frame_size_msb_mul = 0x2000 # must be less than 0x4000 frame_max_header = 16 def append_intro(buf: bytearray) -> None: def BOGUS(x: int) -> int: return x buf.extend(nut.file_id_string) # main_header packet ####################################################### with nut.append_packet(buf, nut.StartCode.main) as pkt: frame_code = 0 def INC_FRAME_CODE() -> None: nonlocal frame_code frame_code += 1 if frame_code == ord("N"): frame_code += 1 # head nut.append_vu(pkt, 3) #! version nut.append_vu(pkt, 1) #! stream_count nut.append_vu(pkt, frame_max_header + fb_size) #! max_distance # time bases nut.append_vu(pkt, 1) #! time_base_count # time_bases[0] = 1ns nut.append_vu(pkt, 1) #! numerator nut.append_vu(pkt, 1000000000) #! denominator # frame codes # "A muxer SHOULD mark [frame codes] 0x00 and 0xFF as invalid" # frame_code=0 (invalid) nut.append_vu(pkt, nut.FrameFlag.INVALID) #! flags nut.append_vu(pkt, 2) #! field_count nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts nut.append_vu(pkt, 1) #! 1: fields.size_msb_mul INC_FRAME_CODE() # frame_code=1 (main) nut.append_vu( pkt, #! flags nut.FrameFlag.KEY # because they're full framebuffers, all frames are keyframes | nut.FrameFlag.SIZE_MSB # 720*576/2 > 16384 (the max val of data_size_lsb) | nut.FrameFlag.CODED_PTS # framerate is unknown, each frame must have a timestamp | nut.FrameFlag.CHECKSUM, ) # framerate is unknown, guard against exceeding max_pts_distance nut.append_vu(pkt, 6) #! field_count nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts nut.append_vu(pkt, frame_size_msb_mul) #! 1: fields.size_msb_mul nut.append_vu(pkt, 0) #! 2: fields.stream nut.append_vu(pkt, fb_size % frame_size_msb_mul) #! 3: fields.size_lsb nut.append_vu(pkt, 0) #! 4: fields.reserved nut.append_vu(pkt, 1) #! 5: fields.count INC_FRAME_CODE() # frame_code=N-255 (invalid) nut.append_vu(pkt, nut.FrameFlag.INVALID) #! flags nut.append_vu(pkt, 2) #! field_count nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts nut.append_vu( pkt, #! 1: fields.size_msb_mul 256 - frame_code - (1 if frame_code < ord("N") else 0), ) # tail nut.append_vu(pkt, 0) #! header_count_minus_1 # stream_header packet ##################################################### with nut.append_packet(buf, nut.StartCode.stream) as pkt: nut.append_vu(pkt, 0) #! stream_id nut.append_vu(pkt, nut.StreamClass.VIDEO) #! stream_class nut.append_vb(pkt, b"BGR\x08") #! fourcc nut.append_vu(pkt, 0) #! time_base_id nut.append_vu(pkt, 0) #! max_pts_shift nut.append_vu(pkt, BOGUS(0)) #! max_pts_distance (all frames have a checksum) nut.append_vu(pkt, 0) #! decode_delay nut.append_vu(pkt, 0) #! stream_flags nut.append_vb(pkt, b"") #! codec_specific_data nut.append_vu(pkt, fb_w) #! width nut.append_vu(pkt, fb_h) #! height nut.append_vu(pkt, sample_w) #! sample_width nut.append_vu(pkt, sample_h) #! sample_height nut.append_vu(pkt, nut.ColorSpace.UNKNOWN) #! colorspace_type class State: last_frame_len: int def __init__(self) -> None: self.last_frame_len = 0 def append_frame_intro(self, buf: bytearray, time_ns: int) -> None: start = len(buf) # syncpoint packet with nut.append_packet(buf, nut.StartCode.syncpoint) as pkt: nut.append_vu(pkt, time_ns, minbytes=10, maxbytes=10) #! global_key_pts nut.append_vu( pkt, self.last_frame_len // 16, minbytes=2, maxbytes=2 ) #! back_ptr_div16 # frame header (1+10+1+4 = 16 bytes) hdr_start = len(buf) nut.append_u8(buf, 1) #! frame_code (1 byte) nut.append_vu(buf, time_ns, minbytes=10, maxbytes=10) #! coded_pts (<=10 bytes) nut.append_vu(buf, fb_size // frame_size_msb_mul) #! data_size_msb (1 byte) nut.append_u32(buf, nut.crc32(buf[hdr_start:])) #! checksum (4 bytes) # `==` instead of `<=` because we set minbytes= assert len(buf) - hdr_start == frame_max_header # return self.last_frame_len = (len(buf) - start) + fb_size