Coverage for pure3270/protocol/tn3270e_header.py: 85%
48 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 20:54 +0000
1"""TN3270E message header structure for pure3270."""
3import struct
4import logging
5from typing import Optional
6from .utils import (
7 TN3270_DATA,
8 SCS_DATA,
9 RESPONSE,
10 BIND_IMAGE,
11 UNBIND,
12 NVT_DATA,
13 REQUEST,
14 SSCP_LU_DATA,
15 PRINT_EOJ,
16 TN3270E_RSF_NO_RESPONSE,
17 TN3270E_RSF_ERROR_RESPONSE,
18 TN3270E_RSF_ALWAYS_RESPONSE,
19 TN3270E_RSF_POSITIVE_RESPONSE,
20 TN3270E_RSF_NEGATIVE_RESPONSE,
21)
23logger = logging.getLogger(__name__)
26class TN3270EHeader:
27 """
28 TN3270E Message Header Structure (RFC 2355 Section 3).
30 The TN3270E header is a 5-byte structure with the following fields:
32 Byte 0: DATA-TYPE
33 Byte 1: REQUEST-FLAG
34 Byte 2: RESPONSE-FLAG
35 Byte 3-4: SEQ-NUMBER (16-bit big-endian)
36 """
38 def __init__(
39 self,
40 data_type: int = TN3270_DATA,
41 request_flag: int = 0,
42 response_flag: int = TN3270E_RSF_NO_RESPONSE,
43 seq_number: int = 0,
44 ):
45 """
46 Initialize a TN3270E header.
48 Args:
49 data_type: Type of data (TN3270_DATA, SCS_DATA, etc.)
50 request_flag: Request flags
51 response_flag: Response flags
52 seq_number: Sequence number (0-65535)
53 """
54 self.data_type = data_type
55 self.request_flag = request_flag
56 self.response_flag = response_flag
57 self.seq_number = seq_number
59 @classmethod
60 def from_bytes(cls, data: bytes) -> Optional["TN3270EHeader"]:
61 """
62 Parse a TN3270E header from bytes.
64 Args:
65 data: 5 bytes containing the TN3270E header
67 Returns:
68 TN3270EHeader instance or None if parsing fails
69 """
70 if len(data) < 5:
71 return None
73 try:
74 # Unpack the 5-byte header
75 # B = unsigned char (1 byte)
76 # H = unsigned short (2 bytes, big-endian)
77 data_type, request_flag, response_flag, seq_number = struct.unpack(
78 "!BBBH", data[:5]
79 )
80 return cls(data_type, request_flag, response_flag, seq_number)
81 except struct.error:
82 return None
84 def to_bytes(self) -> bytes:
85 """
86 Convert the header to bytes.
88 Returns:
89 5 bytes representing the TN3270E header
90 """
91 return struct.pack(
92 "!BBBH",
93 self.data_type,
94 self.request_flag,
95 self.response_flag,
96 self.seq_number,
97 )
99 def __repr__(self) -> str:
100 """String representation of the header."""
101 type_names = {
102 TN3270_DATA: "TN3270_DATA",
103 SCS_DATA: "SCS_DATA",
104 RESPONSE: "RESPONSE",
105 BIND_IMAGE: "BIND_IMAGE",
106 UNBIND: "UNBIND",
107 NVT_DATA: "NVT_DATA",
108 REQUEST: "REQUEST",
109 SSCP_LU_DATA: "SSCP_LU_DATA",
110 PRINT_EOJ: "PRINT_EOJ",
111 }
113 response_names = {
114 TN3270E_RSF_NO_RESPONSE: "NO_RESPONSE",
115 TN3270E_RSF_ERROR_RESPONSE: "ERROR_RESPONSE",
116 TN3270E_RSF_ALWAYS_RESPONSE: "ALWAYS_RESPONSE",
117 TN3270E_RSF_POSITIVE_RESPONSE: "POSITIVE_RESPONSE",
118 TN3270E_RSF_NEGATIVE_RESPONSE: "NEGATIVE_RESPONSE",
119 }
121 type_name = type_names.get(self.data_type, f"0x{self.data_type:02x}")
122 response_name = response_names.get(
123 self.response_flag, f"0x{self.response_flag:02x}"
124 )
126 return (
127 f"TN3270EHeader(data_type={type_name}, "
128 f"request_flag=0x{self.request_flag:02x}, "
129 f"response_flag={response_name}, "
130 f"seq_number={self.seq_number})"
131 )
133 def is_tn3270_data(self) -> bool:
134 """Check if this is TN3270_DATA type."""
135 return self.data_type == TN3270_DATA
137 def is_scs_data(self) -> bool:
138 """Check if this is SCS_DATA type."""
139 return self.data_type == SCS_DATA
141 def is_response(self) -> bool:
142 """Check if this is RESPONSE type."""
143 return self.data_type == RESPONSE
145 def is_error_response(self) -> bool:
146 """Check if this is an error response."""
147 return self.response_flag == TN3270E_RSF_ERROR_RESPONSE
149 def is_negative_response(self) -> bool:
150 """Check if this is a negative response."""
151 return self.response_flag == TN3270E_RSF_NEGATIVE_RESPONSE
153 def is_positive_response(self) -> bool:
154 """Check if this is a positive response."""
155 return self.response_flag == TN3270E_RSF_POSITIVE_RESPONSE
157 def is_always_response(self) -> bool:
158 """Check if this is an always response."""
159 return self.response_flag == TN3270E_RSF_ALWAYS_RESPONSE
161 def get_data_type_name(self) -> str:
162 """Get the name of the data type."""
163 type_names = {
164 TN3270_DATA: "TN3270_DATA",
165 SCS_DATA: "SCS_DATA",
166 RESPONSE: "RESPONSE",
167 BIND_IMAGE: "BIND_IMAGE",
168 UNBIND: "UNBIND",
169 NVT_DATA: "NVT_DATA",
170 REQUEST: "REQUEST",
171 SSCP_LU_DATA: "SSCP_LU_DATA",
172 PRINT_EOJ: "PRINT_EOJ",
173 }
174 return type_names.get(self.data_type, f"UNKNOWN(0x{self.data_type:02x})")
176 def get_response_flag_name(self) -> str:
177 """Get the name of the response flag."""
178 response_names = {
179 TN3270E_RSF_NO_RESPONSE: "NO_RESPONSE",
180 TN3270E_RSF_ERROR_RESPONSE: "ERROR_RESPONSE",
181 TN3270E_RSF_ALWAYS_RESPONSE: "ALWAYS_RESPONSE",
182 TN3270E_RSF_POSITIVE_RESPONSE: "POSITIVE_RESPONSE",
183 TN3270E_RSF_NEGATIVE_RESPONSE: "NEGATIVE_RESPONSE",
184 }
185 return response_names.get(
186 self.response_flag, f"UNKNOWN(0x{self.response_flag:02x})"
187 )