1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# lib9p_util/nut_gen/app.py - Application-specific code for NUT file generation
#
# Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com>
# SPDX-License-Identifier: AGPL-3.0-or-later
from . import nut
# pylint: disable=unused-variable
__all__ = [
"append_intro",
"State",
"fb_w",
"fb_h",
"fb_size",
]
fb_w = 720 // 2
fb_h = 576
fb_size = fb_w * fb_h
sample_w = 2
sample_h = 1
frame_size_msb_mul = 0x2000 # must be less than 0x4000
frame_max_header = 16
def append_intro(buf: bytearray) -> None:
def BOGUS(x: int) -> int:
return x
buf.extend(nut.file_id_string)
# main_header packet #######################################################
with nut.append_packet(buf, nut.StartCode.main) as pkt:
frame_code = 0
def INC_FRAME_CODE() -> None:
nonlocal frame_code
frame_code += 1
if frame_code == ord("N"):
frame_code += 1
# head
nut.append_vu(pkt, 3) #! version
nut.append_vu(pkt, 1) #! stream_count
nut.append_vu(pkt, frame_max_header + fb_size) #! max_distance
# time bases
nut.append_vu(pkt, 1) #! time_base_count
# time_bases[0] = 1ns
nut.append_vu(pkt, 1) #! numerator
nut.append_vu(pkt, 1000000000) #! denominator
# frame codes
# "A muxer SHOULD mark [frame codes] 0x00 and 0xFF as invalid"
# frame_code=0 (invalid)
nut.append_vu(pkt, nut.FrameFlag.INVALID) #! flags
nut.append_vu(pkt, 2) #! field_count
nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts
nut.append_vu(pkt, 1) #! 1: fields.size_msb_mul
INC_FRAME_CODE()
# frame_code=1 (main)
nut.append_vu(
pkt, #! flags
nut.FrameFlag.KEY # because they're full framebuffers, all frames are keyframes
| nut.FrameFlag.SIZE_MSB # 720*576/2 > 16384 (the max val of data_size_lsb)
| nut.FrameFlag.CODED_PTS # framerate is unknown, each frame must have a timestamp
| nut.FrameFlag.CHECKSUM,
) # framerate is unknown, guard against exceeding max_pts_distance
nut.append_vu(pkt, 6) #! field_count
nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts
nut.append_vu(pkt, frame_size_msb_mul) #! 1: fields.size_msb_mul
nut.append_vu(pkt, 0) #! 2: fields.stream
nut.append_vu(pkt, fb_size % frame_size_msb_mul) #! 3: fields.size_lsb
nut.append_vu(pkt, 0) #! 4: fields.reserved
nut.append_vu(pkt, 1) #! 5: fields.count
INC_FRAME_CODE()
# frame_code=N-255 (invalid)
nut.append_vu(pkt, nut.FrameFlag.INVALID) #! flags
nut.append_vu(pkt, 2) #! field_count
nut.append_vu(pkt, BOGUS(0)) #! 0: fields.pts
nut.append_vu(
pkt, #! 1: fields.size_msb_mul
256 - frame_code - (1 if frame_code < ord("N") else 0),
)
# tail
nut.append_vu(pkt, 0) #! header_count_minus_1
# stream_header packet #####################################################
with nut.append_packet(buf, nut.StartCode.stream) as pkt:
nut.append_vu(pkt, 0) #! stream_id
nut.append_vu(pkt, nut.StreamClass.VIDEO) #! stream_class
nut.append_vb(pkt, b"BGR\x08") #! fourcc
nut.append_vu(pkt, 0) #! time_base_id
nut.append_vu(pkt, 0) #! max_pts_shift
nut.append_vu(pkt, BOGUS(0)) #! max_pts_distance (all frames have a checksum)
nut.append_vu(pkt, 0) #! decode_delay
nut.append_vu(pkt, 0) #! stream_flags
nut.append_vb(pkt, b"") #! codec_specific_data
nut.append_vu(pkt, fb_w) #! width
nut.append_vu(pkt, fb_h) #! height
nut.append_vu(pkt, sample_w) #! sample_width
nut.append_vu(pkt, sample_h) #! sample_height
nut.append_vu(pkt, nut.ColorSpace.UNKNOWN) #! colorspace_type
class State:
last_frame_len: int
def __init__(self) -> None:
self.last_frame_len = 0
def append_frame_intro(self, buf: bytearray, time_ns: int) -> None:
start = len(buf)
# syncpoint packet
with nut.append_packet(buf, nut.StartCode.syncpoint) as pkt:
nut.append_vu(pkt, time_ns, minbytes=10, maxbytes=10) #! global_key_pts
nut.append_vu(
pkt, self.last_frame_len // 16, minbytes=2, maxbytes=2
) #! back_ptr_div16
# frame header (1+10+1+4 = 16 bytes)
hdr_start = len(buf)
nut.append_u8(buf, 1) #! frame_code (1 byte)
nut.append_vu(buf, time_ns, minbytes=10, maxbytes=10) #! coded_pts (<=10 bytes)
nut.append_vu(buf, fb_size // frame_size_msb_mul) #! data_size_msb (1 byte)
nut.append_u32(buf, nut.crc32(buf[hdr_start:])) #! checksum (4 bytes)
# `==` instead of `<=` because we set minbytes=
assert len(buf) - hdr_start == frame_max_header
# return
self.last_frame_len = (len(buf) - start) + fb_size
|