west: runners: Implement the west rtt command for jlink

Moves the telnet client into runners/core.py as well, as this is now shared
between openocd and jlink.

Signed-off-by: Tobias Pisani <mail@topisani.dev>
This commit is contained in:
Tobias Pisani 2024-09-10 12:33:30 +02:00 committed by Anas Nashif
parent 18f45b5f06
commit 73c8235881
3 changed files with 70 additions and 35 deletions

View File

@ -17,11 +17,14 @@ import errno
import logging
import os
import platform
import re
import selectors
import shlex
import shutil
import signal
import socket
import subprocess
import re
import sys
from dataclasses import dataclass, field
from functools import partial
from enum import Enum
@ -902,3 +905,34 @@ class ZephyrBinaryRunner(abc.ABC):
# RuntimeError avoids a stack trace saved in run_common.
raise RuntimeError(err)
def run_telnet_client(self, host: str, port: int) -> None:
'''
Run a telnet client for user interaction.
'''
# If a `nc` command is available, run it, as it will provide the best support for
# CONFIG_SHELL_VT100_COMMANDS etc.
if shutil.which('nc') is not None:
client_cmd = ['nc', host, str(port)]
self.run_client(client_cmd)
return
# Otherwise, use a pure python implementation. This will work well for logging,
# but input is line based only.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sel = selectors.DefaultSelector()
sel.register(sys.stdin, selectors.EVENT_READ)
sel.register(sock, selectors.EVENT_READ)
while True:
events = sel.select()
for key, _ in events:
if key.fileobj == sys.stdin:
text = sys.stdin.readline()
if text:
sock.send(text.encode())
elif key.fileobj == sock:
resp = sock.recv(2048)
if resp:
print(resp.decode())

View File

@ -12,6 +12,8 @@ from pathlib import Path
import shlex
import subprocess
import sys
import socket
import time
import tempfile
from runners.core import ZephyrBinaryRunner, RunnerCaps, FileType
@ -25,6 +27,7 @@ except ImportError:
DEFAULT_JLINK_EXE = 'JLink.exe' if sys.platform == 'win32' else 'JLinkExe'
DEFAULT_JLINK_GDB_PORT = 2331
DEFAULT_JLINK_RTT_PORT = 19021
def is_ip(ip):
try:
@ -49,6 +52,7 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
gdbserver='JLinkGDBServer',
gdb_host='',
gdb_port=DEFAULT_JLINK_GDB_PORT,
rtt_port=DEFAULT_JLINK_RTT_PORT,
tui=False, tool_opt=[]):
super().__init__(cfg)
self.file = cfg.file
@ -70,6 +74,7 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
self.gdb_port = gdb_port
self.tui_arg = ['-tui'] if tui else []
self.loader = loader
self.rtt_port = rtt_port
self.tool_opt = []
for opts in [shlex.split(opt) for opt in tool_opt]:
@ -81,9 +86,9 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
@classmethod
def capabilities(cls):
return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach'},
return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach', 'rtt'},
dev_id=True, flash_addr=True, erase=True, reset=True,
tool_opt=True, file=True)
tool_opt=True, file=True, rtt=True)
@classmethod
def dev_id_help(cls) -> str:
@ -126,6 +131,10 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
dest='reset', nargs=0,
action=ToggleAction,
help='obsolete synonym for --reset/--no-reset')
parser.add_argument('--rtt-client', default='JLinkRTTClient',
help='RTT client, default is JLinkRTTClient')
parser.add_argument('--rtt-port', default=DEFAULT_JLINK_RTT_PORT,
help=f'jlink rtt port, defaults to {DEFAULT_JLINK_RTT_PORT}')
parser.set_defaults(reset=False)
@ -142,6 +151,7 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
loader=args.loader,
gdb_host=args.gdb_host,
gdb_port=args.gdb_port,
rtt_port=args.rtt_port,
tui=args.tui, tool_opt=args.tool_opt)
def print_gdbserver_message(self):
@ -248,6 +258,7 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
'-singlerun'] +
(['-nogui'] if self.supports_nogui else []) +
(['-rtos', plugin_dir] if rtos else []) +
['-rtttelnetport', str(self.rtt_port)] +
self.tool_opt)
if command == 'flash':
@ -258,6 +269,27 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
self.require(self.gdbserver)
self.print_gdbserver_message()
self.check_call(server_cmd)
elif command == 'rtt':
self.print_gdbserver_message()
server_cmd += ['-nohalt']
server_proc = self.popen_ignore_int(server_cmd)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wait for the port to be open
while server_proc.poll() is None:
try:
sock.connect(('localhost', self.rtt_port))
break
except ConnectionRefusedError:
time.sleep(0.1)
sock.shutdown(socket.SHUT_RDWR)
time.sleep(0.1)
self.run_telnet_client('localhost', self.rtt_port)
except Exception as e:
self.logger.error(e)
finally:
server_proc.terminate()
server_proc.wait()
else:
if self.gdb_cmd is None:
raise ValueError('Cannot debug; gdb is missing')

View File

@ -7,11 +7,8 @@
'''Runner for openocd.'''
import re
import selectors
import shutil
import socket
import subprocess
import sys
import time
from os import path
@ -454,37 +451,9 @@ class OpenOcdBinaryRunner(ZephyrBinaryRunner):
# the port is open now.
self.logger.info("Opening RTT")
time.sleep(0.1) # Give the server a moment to output log messages first
self._run_rtt_client()
self.run_telnet_client('localhost', self.rtt_port)
except Exception as e:
self.logger.error(e)
finally:
server_proc.terminate()
server_proc.wait()
def _run_rtt_client(self):
# If a `nc` command is available, run it, as it will provide the best support for
# CONFIG_SHELL_VT100_COMMANDS etc.
if shutil.which('nc') is not None:
client_cmd = ['nc', 'localhost', str(self.rtt_port)]
self.run_client(client_cmd)
return
# Otherwise, use a pure python implementation. This will work well for logging,
# but input is line based only.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.rtt_port))
sel = selectors.DefaultSelector()
sel.register(sys.stdin, selectors.EVENT_READ)
sel.register(sock, selectors.EVENT_READ)
while True:
events = sel.select()
for key, _ in events:
if key.fileobj == sys.stdin:
text = sys.stdin.readline()
if text:
sock.send(text.encode())
elif key.fileobj == sock:
resp = sock.recv(2048)
if resp:
print(resp.decode())