summaryrefslogtreecommitdiff
path: root/lib9p_util/nut_gen/app.py
blob: d3fe0d5013216d3ccb67bb5adb62ce6ad9ff1155 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# lib9p_util/nut_gen/app.py - Application-specific code for NUT file generation
#
# Copyright (C) 2025  Luke T. Shumaker <lukeshu@lukeshu.com>
# SPDX-License-Identifier: AGPL-3.0-or-later

from . import nut

# pylint: disable=unused-variable
__all__ = [
    "append_intro",
    "State",
    "fb_w",
    "fb_h",
    "fb_size",
]

fb_w = 720 // 2
fb_h = 576
fb_size = fb_w * fb_h
sample_w = 2
sample_h = 1

frame_size_msb_mul = 0x2000  # must be less than 0x4000
frame_max_header = 16


def append_intro(buf: bytearray) -> None:
    def BOGUS(x: int) -> int:
        return x

    buf.extend(nut.file_id_string)

    # main_header packet #######################################################
    with nut.append_packet(buf, nut.StartCode.main) as pkt:
        frame_code = 0

        def INC_FRAME_CODE() -> None:
            nonlocal frame_code
            frame_code += 1
            if frame_code == ord("N"):
                frame_code += 1

        # head
        nut.append_vu(pkt, 3)  #! version
        nut.append_vu(pkt, 1)  #! stream_count
        nut.append_vu(pkt, frame_max_header + fb_size)  #! max_distance
        # time bases
        nut.append_vu(pkt, 1)  #! time_base_count
        # time_bases[0] = 1ns
        nut.append_vu(pkt, 1)  #! numerator
        nut.append_vu(pkt, 1000000000)  #! denominator

        # frame codes
        # "A muxer SHOULD mark [frame codes] 0x00 and 0xFF as invalid"

        # frame_code=0 (invalid)
        nut.append_vu(pkt, nut.FrameFlag.INVALID)  #! flags
        nut.append_vu(pkt, 2)  #! field_count
        nut.append_vu(pkt, BOGUS(0))  #! 0: fields.pts
        nut.append_vu(pkt, 1)  #! 1: fields.size_msb_mul
        INC_FRAME_CODE()

        # frame_code=1 (main)
        nut.append_vu(
            pkt,  #! flags
            nut.FrameFlag.KEY  # because they're full framebuffers, all frames are keyframes
            | nut.FrameFlag.SIZE_MSB  # 720*576/2 > 16384 (the max val of data_size_lsb)
            | nut.FrameFlag.CODED_PTS  # framerate is unknown, each frame must have a timestamp
            | nut.FrameFlag.CHECKSUM,
        )  # framerate is unknown, guard against exceeding max_pts_distance
        nut.append_vu(pkt, 6)  #! field_count
        nut.append_vu(pkt, BOGUS(0))  #! 0: fields.pts
        nut.append_vu(pkt, frame_size_msb_mul)  #! 1: fields.size_msb_mul
        nut.append_vu(pkt, 0)  #! 2: fields.stream
        nut.append_vu(pkt, fb_size % frame_size_msb_mul)  #! 3: fields.size_lsb
        nut.append_vu(pkt, 0)  #! 4: fields.reserved
        nut.append_vu(pkt, 1)  #! 5: fields.count
        INC_FRAME_CODE()

        # frame_code=N-255 (invalid)
        nut.append_vu(pkt, nut.FrameFlag.INVALID)  #! flags
        nut.append_vu(pkt, 2)  #! field_count
        nut.append_vu(pkt, BOGUS(0))  #! 0: fields.pts
        nut.append_vu(
            pkt,  #! 1: fields.size_msb_mul
            256 - frame_code - (1 if frame_code < ord("N") else 0),
        )

        # tail
        nut.append_vu(pkt, 0)  #! header_count_minus_1

    # stream_header packet #####################################################
    with nut.append_packet(buf, nut.StartCode.stream) as pkt:
        nut.append_vu(pkt, 0)  #! stream_id
        nut.append_vu(pkt, nut.StreamClass.VIDEO)  #! stream_class
        nut.append_vb(pkt, b"BGR\x08")  #! fourcc
        nut.append_vu(pkt, 0)  #! time_base_id
        nut.append_vu(pkt, 0)  #! max_pts_shift
        nut.append_vu(pkt, BOGUS(0))  #! max_pts_distance (all frames have a checksum)
        nut.append_vu(pkt, 0)  #! decode_delay
        nut.append_vu(pkt, 0)  #! stream_flags
        nut.append_vb(pkt, b"")  #! codec_specific_data
        nut.append_vu(pkt, fb_w)  #! width
        nut.append_vu(pkt, fb_h)  #! height
        nut.append_vu(pkt, sample_w)  #! sample_width
        nut.append_vu(pkt, sample_h)  #! sample_height
        nut.append_vu(pkt, nut.ColorSpace.UNKNOWN)  #! colorspace_type


class State:
    last_frame_len: int

    def __init__(self) -> None:
        self.last_frame_len = 0

    def append_frame_intro(self, buf: bytearray, time_ns: int) -> None:
        start = len(buf)

        # syncpoint packet
        with nut.append_packet(buf, nut.StartCode.syncpoint) as pkt:
            nut.append_vu(pkt, time_ns, minbytes=10, maxbytes=10)  #! global_key_pts
            nut.append_vu(
                pkt, self.last_frame_len // 16, minbytes=2, maxbytes=2
            )  #! back_ptr_div16

        # frame header (1+10+1+4 = 16 bytes)
        hdr_start = len(buf)
        nut.append_u8(buf, 1)  #! frame_code (1 byte)
        nut.append_vu(buf, time_ns, minbytes=10, maxbytes=10)  #! coded_pts (<=10 bytes)
        nut.append_vu(buf, fb_size // frame_size_msb_mul)  #! data_size_msb (1 byte)
        nut.append_u32(buf, nut.crc32(buf[hdr_start:]))  #! checksum (4 bytes)
        # `==` instead of `<=` because we set minbytes=
        assert len(buf) - hdr_start == frame_max_header

        # return
        self.last_frame_len = (len(buf) - start) + fb_size