shared package

Subpackages

Submodules

shared.controller module

class shared.controller.ControllerInterface(data_model: dataclasses.dataclass, virtual_mode: Optional[bool] = None, state_machine: shared.physical_interface.PhysicalInterface = <shared.physical_interface.PhysicalInterface object>)

Bases: object

This is a class that will handle the data coming from the physical level as well as from the GUI.

data_model: dataclasses.dataclass
set_machine()

Configures the 61851 state machine.

Returns

set_settings()

Sets virtual mode. If False, it will be using an Charge Controller card.

Returns

state_machine: shared.physical_interface.PhysicalInterface = <shared.physical_interface.PhysicalInterface object>
virtual_mode: bool = None

shared.custom_canvas module

class shared.custom_canvas.CustomCanvas(power_data, energy_data, limits, battery=None)

Bases: matplotlib.backends.backend_qt5agg.FigureCanvasQTAgg, matplotlib.animation.TimedAnimation

This is the class that will plot the data and display it properly in 15118 DC use case.

get_plot_data(ax, value=nan, color='b', linestyle='-', label='Present value')

Creates empty arrays needed for an initial drawing. We can customize it.

Parameters
  • ax – The ax to be used.

  • value – The value used to fill the array.

  • color – The line’s color.

  • linestyle – The line’s style.

  • label – The line’s label.

Returns

np.array, Line2D – the resulting array and its line.

new_frame_seq()

Return a new sequence of frame information.

set_subplot(ax, plot_name, limits)
shift_left(plot_values, data, line) None

Shifts array data to the left creating a moving window.

Parameters
  • plot_values – The array-like to be shifted.

  • data – The data the array-like will get.

  • line – The line that should be updated.

Returns

shared.charge_controller_interface module

shared.global_values module

shared.gui module

class shared.gui.GUI

Bases: object

abstract check_state_box()

Checks the boxes depending on the current state.

Returns

abstract connect_signals()

Connects data to their respective widgets.

Returns

abstract retranslate_ui(main_window) None

Retranslates some texts.

Parameters

main_window – The window containing all the widgets.

Returns

abstract set_ev_settings()

Sets EV settings.

Returns

abstract setup_ui(main_window) None

Sets the UI.

Parameters

main_window – The window containing all the widgets.

Returns

abstract update()

Updates the UI.

Returns

abstract update_timer()

Updates the departure time timer.

Returns

shared.log module

class shared.log.CustomFormatter(fmt=None, datefmt=None, style='%', validate=True)

Bases: logging.Formatter

Logging Formatter to add colors and count warning / errors, source: https://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output

FORMATS = {10: '\x1b[38;21m[%(asctime)s] %(levelname)s (%(module)s): %(message)s (%(filename)s:%(lineno)d)\x1b[0m', 20: '\x1b[38;21m[%(asctime)s] %(levelname)s (%(module)s): %(message)s\x1b[0m', 30: '\x1b[33;21m[%(asctime)s] %(levelname)s (%(module)s): %(message)s (%(filename)s:%(lineno)d)\x1b[0m', 40: '\x1b[31;21m[%(asctime)s] %(levelname)s (%(module)s): %(message)s (%(filename)s:%(lineno)d)\x1b[0m', 50: '\x1b[31;1m[%(asctime)s] %(levelname)s (%(module)s): %(message)s (%(filename)s:%(lineno)d)\x1b[0m'}
bold_red = '\x1b[31;1m'
format(record)

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

grey = '\x1b[38;21m'
info_format = '[%(asctime)s] %(levelname)s (%(module)s): %(message)s'
red = '\x1b[31;21m'
reset = '\x1b[0m'
yellow = '\x1b[33;21m'

shared.message_handling module

class shared.message_handling.MessageHandler(*args, **kwargs)

Bases: object

This is the class that will process every single V2GTP message.

static decode(exi_contents: bytes, schema: org.openexi.schema.org.openexi.schema.EXISchema) str

Turns encoded EXI bytes to human-readable string. Relies on Java classes.

Parameters
  • exi_contents – The EXI encoded contents.

  • schema – The EXI schema used.

Returns

str – the decoded string.

static encode(xml_contents: str, schema: org.openexi.schema.org.openexi.schema.EXISchema) str

Turns a human-readable string to an EXI-encoded string. Relies on Java classes.

Parameters
  • xml_contents – The XML string to be encoded.

  • schema – The EXI schema used.

Returns

str – the encoded result.

exi_to_supported_app(exi_contents) str
exi_to_v2g_msg(exi_contents) str
static is_payload_length_correct(v2gtp_message: shared.messages.V2GTPMessage) bool
static is_payload_type_correct(v2gtp_message: shared.messages.V2GTPMessage) bool
is_valid(v2gtp_message: shared.messages.V2GTPMessage) bool
static is_version_valid(v2gtp_message: shared.messages.V2GTPMessage) bool
static is_xml_valid(xml, path_app_protocol_xsd, path_common_messages_xsd, path_dc_messages_xsd)

This method allows to check if an XML message is valid using the corresponding XSD.

Parameters
  • xml – Input XML file

  • path_app_protocol_xsd – App Protocol XSD Path

  • path_common_messages_xsd – Common Messages XSD Path

  • path_dc_messages_xsd – DC Messages XSD Path

Returns

boolean statement - true: valid, false: invalid

static marshall(message) str

Turns an XML object to a string.

Parameters

message – The XML object to be processed.

Returns

str – the resulting XML string.

static open_exi_schema(filepath: str) org.openexi.schema.org.openexi.schema.EXISchema

Loads EXISchema. Relies on Java classes.

Parameters

filepath – The path to the EXIG file.

Returns

EXISchema – the object containing the schema.

supported_app_to_exi(xml_contents) bytes
static unmarshall(xml)

Extracts data from XML string and turns it to an object understood by the machine.

Parameters

xml – The XLM string to extract data from.

Returns

object – the resulting XML object.

v2g_msg_to_exi(xml_contents) bytes
class shared.message_handling.Singleton

Bases: type

This is a singleton design pattern class.

shared.messages module

class shared.messages.EXIMessage(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is a class that represents the messages that have an EXI payload. Inherits from V2GTPMessage.

aliastypes = [<class 'shared.messages.EXIMessage'>, <class 'shared.messages.V2GTPMessage'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).protocolVersion>, <XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).inverseProtocolVersion>, <XShortEnumField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).payloadType>]
fill()
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'payloadType': 32770}, <class 'shared.payloads.EXIPayload'>)]
class shared.messages.SDPMessage(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is a class that represents SECC Discovery Protocol messages. Inherits from V2GTPMessage.

aliastypes = [<class 'shared.messages.SDPMessage'>, <class 'shared.messages.V2GTPMessage'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).protocolVersion>, <XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).inverseProtocolVersion>, <XShortEnumField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).payloadType>]
fill()
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'payloadType': 36864}, <class 'shared.payloads.SDPReqPayload'>), ({'payloadType': 36865}, <class 'shared.payloads.SDPResPayload'>)]
class shared.messages.SupportedAppMessage(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is a class that represents SupportedAppProtocol messages. Inherits from V2GTPMessage.

aliastypes = [<class 'shared.messages.SupportedAppMessage'>, <class 'shared.messages.V2GTPMessage'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).protocolVersion>, <XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).inverseProtocolVersion>, <XShortEnumField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).payloadType>]
fill()
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'payloadType': 32769}, <class 'shared.payloads.EXIPayload'>)]
class shared.messages.V2GTPMessage(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is the class that represents V2G messages.

aliastypes = [<class 'shared.messages.V2GTPMessage'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).protocolVersion>, <XByteField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).inverseProtocolVersion>, <XShortEnumField (V2GTPMessage,SupportedAppMessage,EXIMessage,SDPMessage).payloadType>]
abstract fill()
get_inverse_protocol_version()
get_payload_length()
get_payload_type()
get_protocol_version()

shared.payloads module

class shared.payloads.EXIPayload(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is the payload for EXI format contents.

aliastypes = [<class 'shared.payloads.EXIPayload'>, <class 'shared.payloads.Payload'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XIntField (EXIPayload).payloadLength>, <XStrLenField (EXIPayload).payloadContent>]
post_build(pkt: scapy.packet.Packet, pay: scapy.packet.Packet) scapy.packet.Packet

Allows calculation of the payload’s length after it’s been built.

Parameters
  • pkt – The current layer.

  • pay – The actual payload.

:return : pkt + pay – concatenation of layers.

class shared.payloads.Payload(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is the basic payload class for V2G messages.

aliastypes = [<class 'shared.payloads.Payload'>, <class 'scapy.packet.Packet'>]
class shared.payloads.SDPReqPayload(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is the payload for SECC Discovery Protocol Request. 0x00: TCP Transport Protocol 0x00: Security secured with TLS

aliastypes = [<class 'shared.payloads.SDPReqPayload'>, <class 'shared.payloads.Payload'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XIntField (SDPReqPayload).PayloadLength>, <XByteField (SDPReqPayload).SecurityProtocol>, <XByteField (SDPReqPayload).TransportProtocol>]
class shared.payloads.SDPResPayload(*args: Any, **kargs: Any)

Bases: scapy.base_classes.Gen[scapy.packet.Packet]

This is the payload for SECC Discovery Protocol Response. 0x00: TCP Transport Protocol 0x00: Security secured with TLS

aliastypes = [<class 'shared.payloads.SDPResPayload'>, <class 'shared.payloads.Payload'>, <class 'scapy.packet.Packet'>]
fields_desc: Sequence[AnyField] = [<XIntField (SDPResPayload).PayloadLength>, <IP6Field (SDPResPayload).TargetAddress>, <XShortField (SDPResPayload).TargetPort>, <XByteField (SDPResPayload).SecurityProtocol>, <XByteField (SDPResPayload).TransportProtocol>]

shared.physical_interface module

class shared.physical_interface.PhysicalInterface

Bases: transitions.extensions.asyncio.AsyncMachine

This is a state machine class that will simulate the changes at a physical level.

attach(observer)
get_ev_state()
is_state_a()
is_state_b()
is_state_c()
notify()
property pwm
pwm_on()

Only two levels of pwm are handled (PWM equals 5% or 100%). Therfore this function is named this way. It returns True when pwm value is between 3 and 7.

Returns

bool – if pwm value is between 3 and 7.

async set_a()
Sets EV state to state A,

this means open circuit between CP and PE

Returns

async set_b()
Sets EV state to B, Default/Initial position

resistance between CP and PE = 2.7 K Ohms

Returns

async set_c()
Sets EV state to C, EV Ready position

resistance between CP and PE = 882 Ohms

Returns

set_ev_state(value)
async set_pwm()

Only two levels of pwm are handled. Therefore this function is named this way. Sets pwm to 5 percent.

Returns

async unset_pwm()

Only two levels of pwm are handled. Therefore this function is named this way. Sets pwm to 100.

Returns

shared.reaction_message module

class shared.reaction_message.ChangeState

Bases: shared.reaction_message.ReactionToIncomingMessage

This is a reaction to a message that tells it to change the state. Inherits from ReactionToIncomingMessage. Not really used, so consider removing it.

class shared.reaction_message.PauseSession

Bases: shared.reaction_message.ReactionToIncomingMessage

This is a reaction to a message that tells it to pause the session. Inherits from ReactionToIncomingMessage. Not really used, to be implemented in later versions.

class shared.reaction_message.ReactionToIncomingMessage

Bases: object

This is a class that contains any useful information about an incoming message and the reaction to it.

property extra_data
property message
property next_state
class shared.reaction_message.SendMessage

Bases: shared.reaction_message.ReactionToIncomingMessage

This is a reaction to a message that tells it to send it. Inherits from ReactionToIncomingMessage.

class shared.reaction_message.TerminateSession

Bases: shared.reaction_message.ReactionToIncomingMessage

This is a reaction to a message that tells it to end the session. Inherits from ReactionToIncomingMessage.

shared.session module

class shared.session.CommunicationSession(controller: shared.controller.ControllerInterface, *args, **kwargs)

Bases: transitions.core.Machine

This is a charging session’s implementation as understood from 15118.

property controller
property message_timer
pause_session()
reset_message_timer()
reset_sequence_timer()
save_session_data(extra_data)
property sequence_timer
property session_parameters
stop_session()
update_message_timer()
update_sequence_timer()
update_timers()
class shared.session.SessionParameters(session_id: Optional[bytes] = None, ip_address: Optional[str] = None, port: Optional[int] = None)

Bases: object

This is a dataclass that will hold any useful information for the charging session.

ip_address: str = None
port: int = None
session_id: bytes = None

shared.session_handler module

class shared.session_handler.SessionHandler

Bases: object

This a class that will handle the different charging sessions during the V2G communication.

property current_session
abstract get_config()

Gets the needed configuration to run a SessionHandler.

Returns

property interface
property sessions
set_network_parameters()

Sets the settings according to the configuration file.

Returns

abstract start_new_session(controller)

Starts a new communication session.

Parameters

controller – The controller that will handle the data.

Returns

property tcp_port

shared.state module

class shared.state.V2GState(*args, **kwargs)

Bases: transitions.core.State

This a class that will hold everything everything needed in a 15118 state.

property ongoing_perf_time
property ongoing_timeout
abstract process_payload(payload) shared.reaction_message.ReactionToIncomingMessage

Processes the payload and readies the next message.

Parameters

payload – The data that will be processed.

Returns

ReactionToIncomingMessage – the reaction the processed data.

property seq_perf_time
property session
property timeout

shared.threading module

class shared.threading.Worker(controller: shared.controller.ControllerInterface, callback)

Bases: PyQt5.QtCore.QObject

This is a class that will handle a specific task in a thread.

finished
run()

shared.timer module

class shared.timer.MessageTimer(timeout)

Bases: shared.timer.Timer

Message timer as specified in the standard. Inherits from Timer class.

async message_timeout_callback()

Callback for MessageTimer.

Returns

class shared.timer.SequenceTimer(timeout)

Bases: shared.timer.Timer

Sequence timer as specified in the standard. Inherits from Timer class.

async performance_timeout_callback()

Callback for SequenceTimer.

Returns

class shared.timer.Timer(timeout: int, callback)

Bases: object

This is timer implementation that will call a function once its timeout has been reached.

cancel()

Cancels Timer.

Returns

async job()

Calls callback asynchronously after timeout.

Returns

start()

Starts Timer.

Returns

property timeout

shared.utils module

shared.utils.float_to_dc_rational(value: float) shared.xml_classes.dc.v2_g_ci_common_types.RationalNumberType

Casts a float to a DcRationalNumberType.

Parameters

value – The value to cast.

Returns

DcRationalNumberType – the cast result.

shared.utils.greater_rational(rational_1, rational_2)

Compares two RationalNumberType.

Parameters
  • rational_1 – A RationalNumberType.

  • rational_2 – A RationalNumberType.

Returns

RationalNumberType – the greater value.

shared.utils.lower_rational(rational_1, rational_2)

Compares two RationalNumberType.

Parameters
  • rational_1 – A RationalNumberType.

  • rational_2 – A RationalNumberType.

Returns

RationalNumberType – the lower value.

shared.utils.negative_dc_rational(value)

Turns a RationalNumberType negative.

Parameters

value – The value to change.

Returns

RationalNumberType – the result.

shared.utils.rational_to_float(value) float

Casts a RationalNumberType to a float.

Parameters

value – The value to cast.

Returns

float – the cast result.

Module contents

A module containing everything needed by the EVCC and the SECC to run edf15118-20 project.