Source code for iceoryx2.request_response_extensions
# Copyright (c) 2025 Contributors to the Eclipse Foundation## See the NOTICE file(s) distributed with this work for additional# information regarding copyright ownership.## This program and the accompanying materials are made available under the# terms of the Apache Software License 2.0 which is available at# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license# which is available at https://opensource.org/licenses/MIT.## SPDX-License-Identifier: Apache-2.0 OR MIT"""Strong type safe extensions for the request-response messaging pattern."""importctypesfromtypingimportAny,Type,TypeVar,get_args,get_originfrom._iceoryx2import*from.sliceimportSlicefrom.type_nameimportget_type_nameReqT=TypeVar("ReqT",bound=ctypes.Structure)ResT=TypeVar("ResT",bound=ctypes.Structure)
[docs]defrequest_response(self:ServiceBuilder,request:Type[ReqT],response:Type[ResT])->ServiceBuilderPublishSubscribe:""" Returns the `ServiceBuilderRequestResponse` to create a new request-response service. The request/response payload ctype must be provided as argument. """request_type_name=request.__name__request_type_size=0request_type_align=0request_type_variant=TypeVariant.FixedSizeresponse_type_name=response.__name__response_type_size=0response_type_align=0response_type_variant=TypeVariant.FixedSizeifget_origin(request)isSlice:(contained_type,)=get_args(request)request_type_name=get_type_name(contained_type)request_type_variant=TypeVariant.Dynamicrequest_type_size=ctypes.sizeof(contained_type)request_type_align=ctypes.alignment(contained_type)else:request_type_name=get_type_name(request)request_type_size=ctypes.sizeof(request)request_type_align=ctypes.alignment(request)request_type_variant=TypeVariant.FixedSizeifget_origin(response)isSlice:(contained_type,)=get_args(response)response_type_name=get_type_name(contained_type)response_type_variant=TypeVariant.Dynamicresponse_type_size=ctypes.sizeof(contained_type)response_type_align=ctypes.alignment(contained_type)else:response_type_name=get_type_name(response)response_type_size=ctypes.sizeof(response)response_type_align=ctypes.alignment(response)response_type_variant=TypeVariant.FixedSizeresult=self.__request_response()result.__set_request_payload_type(request)result.__set_response_payload_type(response)return(result.__request_payload_type_details(TypeDetail.new().type_variant(request_type_variant).type_name(TypeName.new(request_type_name)).size(request_type_size).alignment(request_type_align)).__request_header_type_details(TypeDetail.new().type_variant(TypeVariant.FixedSize).type_name(TypeName.new("()")).size(0).alignment(1)).__response_payload_type_details(TypeDetail.new().type_variant(response_type_variant).type_name(TypeName.new(response_type_name)).size(response_type_size).alignment(response_type_align)).__response_header_type_details(TypeDetail.new().type_variant(TypeVariant.FixedSize).type_name(TypeName.new("()")).size(0).alignment(1)))
[docs]defset_request_header(self:ServiceBuilderPublishSubscribe,request:Type[ReqT])->ServiceBuilderPublishSubscribe:"""Sets the request header type for the service."""type_name=get_type_name(request)result=self.__request_header_type_details(TypeDetail.new().type_variant(TypeVariant.FixedSize).type_name(TypeName.new(type_name)).size(ctypes.sizeof(request)).alignment(ctypes.alignment(request)))result.__set_request_header_type(request)returnresult
[docs]defset_response_header(self:ServiceBuilderPublishSubscribe,response:Type[ResT])->ServiceBuilderPublishSubscribe:"""Sets the response header type for the service."""type_name=get_type_name(response)result=self.__response_header_type_details(TypeDetail.new().type_variant(TypeVariant.FixedSize).type_name(TypeName.new(type_name)).size(ctypes.sizeof(response)).alignment(ctypes.alignment(response)))result.__set_response_header_type(response)returnresult
[docs]defrequest_payload(self:Any)->Any:"""Returns a `ctypes.POINTER` to the requests payload."""assertself.__request_payload_type_detailsisnotNoneifget_origin(self.__request_payload_type_details)isSlice:(contained_type,)=get_args(self.__request_payload_type_details)returnSlice(self.payload_ptr,self.__slice_len,contained_type)returnctypes.cast(self.payload_ptr,ctypes.POINTER(self.__request_payload_type_details))
[docs]defresponse_payload(self:Any)->Any:"""Returns a `ctypes.POINTER` to the responses payload."""assertself.__response_payload_type_detailsisnotNoneifget_origin(self.__response_payload_type_details)isSlice:(contained_type,)=get_args(self.__response_payload_type_details)returnSlice(self.payload_ptr,self.__slice_len,contained_type)returnctypes.cast(self.payload_ptr,ctypes.POINTER(self.__response_payload_type_details))
[docs]defrequest_header(self:Any)->Any:"""Returns a `ctypes.POINTER` to the request header."""assertself.__request_header_type_detailsisnotNonereturnctypes.cast(self.user_header_ptr,ctypes.POINTER(self.__request_header_type_details))
[docs]defresponse_header(self:Any)->Any:"""Returns a `ctypes.POINTER` to the response header."""assertself.__response_header_type_detailsisnotNonereturnctypes.cast(self.user_header_ptr,ctypes.POINTER(self.__response_header_type_details),)
[docs]defwrite_request_payload(self:RequestMutUninit,t:Type[ReqT])->RequestMut:"""Writes the provided payload into the request."""assertself.__request_payload_type_detailsisnotNoneassertctypes.sizeof(t)==ctypes.sizeof(self.__request_payload_type_details)assertctypes.alignment(t)==ctypes.alignment(self.__request_payload_type_details)ctypes.memmove(self.payload_ptr,ctypes.byref(t),ctypes.sizeof(t))returnself.assume_init()
[docs]defwrite_response_payload(self:ResponseMutUninit,t:Type[ReqT])->ResponseMut:"""Writes the provided payload into the response."""assertself.__response_payload_type_detailsisnotNoneassertctypes.sizeof(t)==ctypes.sizeof(self.__response_payload_type_details)assertctypes.alignment(t)==ctypes.alignment(self.__response_payload_type_details)ctypes.memmove(self.payload_ptr,ctypes.byref(t),ctypes.sizeof(t))returnself.assume_init()
[docs]defloan_uninit_request(self:Client)->RequestMutUninit:""" Loans/allocates memory from the underlying data segment. The user has to initialize the payload before it can be sent. On failure it returns `LoanError` describing the failure. """assertnotget_origin(self.__request_payload_type_details)isSlicereturnself.__loan_uninit()
[docs]defloan_uninit_response(self:ActiveRequest)->ResponseMutUninit:""" Loans/allocates memory from the underlying data segment. The user has to initialize the payload before it can be sent. On failure it returns `LoanError` describing the failure. """assertnotget_origin(self.__response_payload_type_details)isSlicereturnself.__loan_uninit()
[docs]defloan_slice_uninit_request(self:Client,number_of_elements:int)->RequestMutUninit:""" Loans/allocates memory from the underlying data segment. The user has to initialize the payload before it can be sent. Fails when it is called for data types which are not a slice. On failure it returns `LoanError` describing the failure. """assertget_origin(self.__request_payload_type_details)isSlicereturnself.__loan_slice_uninit(number_of_elements)
[docs]defloan_slice_uninit_response(self:ActiveRequest,number_of_elements:int)->ResponseMutUninit:""" Loans/allocates memory from the underlying data segment. The user has to initialize the payload before it can be sent. Fails when it is called for data types which are not a slice. On failure it returns `LoanError` describing the failure. """assertget_origin(self.__response_payload_type_details)isSlicereturnself.__loan_slice_uninit(number_of_elements)
[docs]definitial_max_slice_len_request(self:PortFactoryClient,value:int)->PortFactoryClient:"""Sets the maximum slice length that a user can allocate."""assertget_origin(self.__request_payload_type_details)isSlicereturnself.__initial_max_slice_len(value)
[docs]definitial_max_slice_len_response(self:PortFactoryServer,value:int)->PortFactoryServer:"""Sets the maximum slice length that a user can allocate."""assertget_origin(self.__response_payload_type_details)isSlicereturnself.__initial_max_slice_len(value)
[docs]defallocation_strategy_request(self:PortFactoryClient,value:AllocationStrategy)->PortFactoryClient:"""Defines the allocation strategy that is used when the memory is exhausted."""assertget_origin(self.__request_payload_type_details)isSlicereturnself.__allocation_strategy(value)
[docs]defallocation_strategy_response(self:PortFactoryServer,value:AllocationStrategy)->PortFactoryServer:"""Defines the allocation strategy that is used when the memory is exhausted."""assertget_origin(self.__response_payload_type_details)isSlicereturnself.__allocation_strategy(value)
[docs]defsend_request_copy(self:Client,t:Type[ReqT])->PendingResponse:"""Sends a copy of the provided type."""assertself.__request_payload_type_detailsisnotNonerequest_uninit=self.__loan_uninit()assertctypes.sizeof(t)==ctypes.sizeof(request_uninit.__request_payload_type_details)assertctypes.alignment(t)==ctypes.alignment(request_uninit.__request_payload_type_details)ctypes.memmove(request_uninit.payload_ptr,ctypes.byref(t),ctypes.sizeof(t))request=request_uninit.assume_init()returnrequest.send()
[docs]defsend_response_copy(self:ActiveRequest,t:Type[ResT])->Any:"""Sends a copy of the provided type."""assertself.__response_payload_type_detailsisnotNoneresponse_uninit=self.__loan_uninit()assertctypes.sizeof(t)==ctypes.sizeof(response_uninit.__response_payload_type_details)assertctypes.alignment(t)==ctypes.alignment(response_uninit.__response_payload_type_details)ctypes.memmove(response_uninit.payload_ptr,ctypes.byref(t),ctypes.sizeof(t))response=response_uninit.assume_init()returnresponse.send()