o
    <id                     @   s  U d Z ddlmZ ddlmZmZmZmZ ddlmZm	Z	m
Z
mZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZmZmZ erWdd	lmZ dd
lmZ ddlmZ daee e d< dZ!dZ"edgddd							dsde#de$dee# dee% dee# dee& dee' dee% dee% ddfddZ(dd de$ddfd!d"Z)	#dtd$e#d%e	d&e%ddfd'd(Z*ed)d*d+dud,d-Z+ed)d*d+d.ed  ddfd/d0Z,d$e#ddfd1d2Z-d$e#de$ddfd3d4Z.		dvd5ee d6ee& ddfd7d8Z/d9e#d:e#d;e'ddfd<d=Z0d$e#ddfd>d?Z1							@dwdee# dee% dAee# dBee% d6ee& dee% dCe'ddfdDdEZ2		dvdFedGee# dHee' ddfdIdJZ3edKgdLdd			dxdMe$dNee# dOee# dKee# ddf
dPdQZ4edKgdLdd		dvdOee# dKee# ddfdRdSZ5de#de%dAe#dBe%d:e#ddfdTdUZ6	dydVee fdWdXZ7dudYdZZ8	dyd[ed$ee# ddfd\d]Z9	dyd^e%d_ee
 ddfd`daZ:dbe%ddfdcddZ;dedfdgdfd_eddfdhdiZ<dee fdjdkZ=dedl fdmdnZ>dudodpZ?de'fdqdrZ@dS )zaT  
Simple, clean API for recording observability metrics.

This module provides a straightforward interface for Redis core code to record
metrics without needing to know about OpenTelemetry internals.

Usage in Redis core code:
    from redis.observability.recorder import record_operation_duration

    start_time = time.monotonic()
    # ... execute Redis command ...
    record_operation_duration(
        command_name='SET',
        duration_seconds=time.monotonic() - start_time,
        server_address='localhost',
        server_port=6379,
        db_namespace='0',
        error=None
    )
    )datetime)TYPE_CHECKINGCallableListOptional)AttributeBuilderConnectionState	CSCReason	CSCResultGeoFailoverReasonPubSubDirection)CloseReasonRedisMetricsCollector)get_observability_instance)!get_observables_registry_instance)deprecated_argsdeprecated_functionstr_if_bytes)ConnectionPoolInterface)SyncDatabase)
OTelConfigN_metrics_collector	csc_itemsconnection_count
batch_sizezXThe batch_size argument is no longer used and will be removed in the next major version.z7.2.1)args_to_warnreasonversioncommand_nameduration_secondsserver_addressserver_portdb_namespaceerroris_blockingretry_attemptsreturnc	           	      C   sT   t du rt a t du rdS zt j| |||||||||d
 W dS  ty)   Y dS w )a  
    Record a Redis command execution duration.

    This is a simple, clean API that Redis core code can call directly.
    If observability is not enabled, this returns immediately with zero overhead.

    Args:
        command_name: Redis command name (e.g., 'GET', 'SET')
        duration_seconds: Command execution time in seconds
        server_address: Redis server address
        server_port: Redis server port
        db_namespace: Redis database index
        error: Exception if command failed, None if successful
        is_blocking: Whether the operation is a blocking command
        batch_size: Number of commands in batch (for pipelines/transactions)
        retry_attempts: Number of retry attempts made

    Example:
        >>> start = time.monotonic()
        >>> # ... execute command ...
        >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
    N)
r   r   r    r!   r"   
error_typenetwork_peer_addressnetwork_peer_portr$   r%   )r   _get_or_create_collectorrecord_operation_duration	Exception)	r   r   r    r!   r"   r#   r$   r   r%    r-   [/root/parts/websockify/install/lib/python3.10/site-packages/redis/observability/recorder.pyr+   2   s(   )r+   connection_poolr   c                 C   D   t du rt a t du rdS z
t j| |d W dS  ty!   Y dS w )as  
    Record connection creation time.

    Args:
        connection_pool: Connection pool implementation
        duration_seconds: Time taken to create connection in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... create connection ...
        >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    Nr/   r   )r   r*   record_connection_create_timer,   r1   r-   r-   r.   r2   t   s   r2      	pool_nameconnection_statecounterc                 C   F   t du rt a t du rdS zt j| ||d W dS  ty"   Y dS w )a  
    Record a connection count change for a single state.

    Args:
        pool_name: Connection pool identifier
        connection_state: State to update (IDLE or USED)
        counter: Number to add (positive) or subtract (negative)

    Example:
        # New connection created (goes to IDLE first)
        >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1)

        # Acquire from pool (transition)
        >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -1)
        >>> record_connection_count('pool_abc123', ConnectionState.USED, 1)

        # Release to pool (transition)
        >>> record_connection_count('pool_abc123', ConnectionState.USED, -1)
        >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1)

        # Pool disconnect 5 idle connections
        >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -5)
    Nr4   r5   r6   )r   r*   record_connection_countr,   r8   r-   r-   r.   r9      s   r9   z{Connection count is now tracked via record_connection_count(). This functionality will be removed in the next major versionz7.4.0)r   r   c                  C   sB   t  } | du r	dS dd }z	| j|d W dS  ty    Y dS w )zB
    Initialize observable gauge for connection count metric.
    Nc                 S   .   t  }|t}g }|D ]}||  q|S N)r   getCONNECTION_COUNT_REGISTRY_KEYextend__observables_registry	callbacksobservationscallbackr-   r-   r.   observable_callback      
z2init_connection_count.<locals>.observable_callbackrD   )r*   init_connection_countr,   )	collectorrE   r-   r-   r.   rH      s   	
rH   connection_poolsc                    sZ   t  }|du r	dS zddlm   fdd}t }|t| W dS  ty,   Y dS w )zG
    Add connection pools to connection count observable registry.
    Nr   Observationc                     s6   g } D ]}|  D ]\}}|  ||d q
q| S )N
attributes)get_connection_countappend)rC   r/   countrN   rL   rJ   r-   r.   connection_count_callback   s   zBregister_pools_connection_count.<locals>.connection_count_callback)r*   opentelemetry.metricsrL   r   registerr=   r,   )rJ   rI   rS   rA   r-   rR   r.   register_pools_connection_count   s   
rV   c                 C   B   t du rt a t du rdS z	t j| d W dS  ty    Y dS w )z
    Record a connection timeout event.

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_timeout('ConnectionPool<localhost:6379>')
    Nr4   )r   r*   record_connection_timeoutr,   rX   r-   r-   r.   rY        rY   c                 C   r0   )at  
    Record time taken to obtain a connection from the pool.

    Args:
        pool_name: Connection pool identifier
        duration_seconds: Wait time in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... wait for connection from pool ...
        >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    Nr4   r   )r   r*   record_connection_wait_timer,   r[   r-   r-   r.   r\     s   r\   close_reasonr'   c                 C   r0   )a  
    Record a connection closed event.

    Args:
        close_reason: Reason for closing (e.g. 'error', 'application_close')
        error_type: Error type if closed due to error

    Example:
        >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
    Nr]   r'   )r   r*   record_connection_closedr,   r^   r-   r-   r.   r_   >  s   r_   connection_namemaint_notificationrelaxedc                 C   r7   )a_  
    Record a connection timeout relaxation event.

    Args:
        connection_name: Connection identifier
        maint_notification: Maintenance notification type
        relaxed: True to count up (relaxed), False to count down (unrelaxed)

    Example:
        >>> record_connection_relaxed_timeout('localhost:6379_a1b2c3d4', 'MOVING', True)
    Nr`   ra   rb   )r   r*   !record_connection_relaxed_timeoutr,   rc   r-   r-   r.   rd   \  s   rd   c                 C   rW   )z
    Record a connection handoff event (e.g., after MOVING notification).

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_handoff('ConnectionPool<localhost:6379>')
    NrX   )r   r*   record_connection_handoffr,   rX   r-   r-   r.   re   }  rZ   re   Tr(   r)   is_internalc              	   C   sN   t du rt a t du rdS zt j| ||||||d W dS  ty&   Y dS w )a  
    Record error count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        error_type: Error type (Exception)
        retry_attempts: Retry attempts
        is_internal: Whether the error is internal (e.g., timeout, network error)

    Example:
        >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
    Nr    r!   r(   r)   r'   r%   rf   )r   r*   record_error_countr,   rg   r-   r-   r.   rh     s"   	rh   	directionchannelshardedc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )a5  
    Record a PubSub message (published or received).

    Args:
        direction: Message direction ('publish' or 'receive')
        channel: Pub/Sub channel name
        sharded: True if sharded Pub/Sub channel

    Example:
        >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
    N)ri   rj   rk   )r   r*   _get_confighide_pubsub_channel_namesrecord_pubsub_messager,   )ri   rj   rk   effective_channelconfigr-   r-   r.   rn     s$   rn   consumer_namez[The consumer_name argument is no longer used and will be removed in the next major version.lag_secondsstream_nameconsumer_groupc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )z
    Record the lag of a streaming message.

    Args:
        lag_seconds: Lag in seconds
        stream_name: Stream name
        consumer_group: Consumer group name
        consumer_name: Consumer name
    Nrr   rs   rt   )r   r*   rl   hide_stream_namesrecord_streaming_lagr,   )rr   rs   rt   rq   effective_stream_namerp   r-   r-   r.   rw     s$   rw   c                 C   s\  t du rt a t du rdS | sdS zt  }t }|duo!|j}t| trg| 	 D ]8\}}|r3dnt
|}|D ])}	|	D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q=q9q+W dS | D ]7}t
|d }|rudn|}|d D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q{qiW dS  ty   Y dS w )aQ  
    Record streaming lag from XREAD/XREADGROUP response.

    Parses the response and calculates lag for each message based on message ID timestamp.

    Args:
        response: Response from XREAD/XREADGROUP command
        consumer_group: Consumer group name (for XREADGROUP)
        consumer_name: Consumer name (for XREADGROUP)
    N-g        i  ru   r   r3   )r   r*   r   now	timestamprl   rv   
isinstancedictitemsr   splitmaxintrw   r,   )responsert   rq   rz   rp   rv   rs   stream_messagesrx   messagesmessage
message_id_r{   rr   stream_entryr-   r-   r.   "record_streaming_lag_from_response  s\   
r   c                 C   sJ   t du rt a t du rdS zt j| ||||d W dS  ty$   Y dS w )a  
    Record a maintenance notification count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')

    Example:
        >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
    Nr    r!   r(   r)   ra   )r   r*   record_maint_notification_countr,   r   r-   r-   r.   r   h  s   r   resultc                 C   rW   )zm
    Record a Client Side Caching (CSC) request.

    Args:
        result: CSC result ('hit' or 'miss')
    Nr   )r   r*   record_csc_requestr,   r   r-   r-   r.   r        r   c                  C   sJ   t du rt a t du rdS dd } z	t j| d W dS  ty$   Y dS w )z;
    Initialize observable gauge for CSC items metric.
    Nc                 S   r:   r;   )r   r<   CSC_ITEMS_REGISTRY_KEYr>   r?   r-   r-   r.   rE     rF   z+init_csc_items.<locals>.observable_callbackrG   )r   r*   init_csc_itemsr,   )rE   r-   r-   r.   r     s   
r   rD   c                    sd   t du rt a t du rdS ddlm   fdd}zt }|t| W dS  ty1   Y dS w )z
    Adds given callback to CSC items observable registry.

    Args:
        callback: Callback function that returns the cache size
        pool_name: Connection pool name for observability
    Nr   rK   c                      s     t jddgS )NrX   rM   )r   build_csc_attributesr-   rL   rD   r4   r-   r.   csc_items_callback  s
   
z7register_csc_items_callback.<locals>.csc_items_callback)r   r*   rT   rL   r   rU   r   r,   )rD   r4   r   rA   r-   r   r.   register_csc_items_callback  s   r   rQ   r   c                 C   r0   )z
    Record a Client Side Caching (CSC) eviction.

    Args:
        count: Number of evictions
        reason: Reason for eviction
    NrQ   r   )r   r*   record_csc_evictionr,   r   r-   r-   r.   r     s   r   bytes_savedc                 C   rW   )z
    Record the number of bytes saved by using Client Side Caching (CSC).

    Args:
        bytes_saved: Number of bytes saved
    Nr   )r   r*   record_csc_network_savedr,   r   r-   r-   r.   r     r   r   	fail_fromr   fail_toc                 C   r7   )z
    Record a geo failover.

    Args:
        fail_from: Database failed from
        fail_to: Database failed to
        reason: Reason for the failover
    Nr   r   r   )r   r*   record_geo_failoverr,   r   r-   r-   r.   r     s   r   c                  C   sj   z!t   } | du s| jjsW dS |  tjtj}t|| jW S  t	y+   Y dS  t
y4   Y dS w )z
    Get or create the global metrics collector.

    Returns:
        RedisMetricsCollector instance if observability is enabled, None otherwise
    N)r   get_provider_managerrp   enabled_telemetryget_meter_provider	get_meterr   
METER_NAMEMETER_VERSIONImportErrorr,   )managermeterr-   r-   r.   r*   9  s   
r*   r   c                  C   s6   zt   } | du rW dS | jW S  ty   Y dS w )z
    Get the OTel configuration from the observability manager.

    Returns:
        OTelConfig instance if observability is enabled, None otherwise
    N)r   r   rp   r,   )r   r-   r-   r.   rl   T  s   
rl   c                   C   s   da dS )zM
    Reset the global collector (used for testing or re-initialization).
    N)r   r-   r-   r-   r.   reset_collectord  s   r   c                   C   s   t du rt a t duS )zw
    Check if observability is enabled.

    Returns:
        True if metrics are being collected, False otherwise
    N)r   r*   r-   r-   r-   r.   
is_enabledl  s   	r   )NNNNNNN)r3   )r&   N)NN)NNNNNNT)NNNr;   )A__doc__r   typingr   r   r   r   redis.observability.attributesr   r   r	   r
   r   r   redis.observability.metricsr   r   redis.observability.providersr   redis.observability.registryr   redis.utilsr   r   r   redis.connectionr   redis.multidb.databaser   redis.observability.configr   r   __annotations__r   r=   strfloatr   r,   boolr+   r2   r9   rH   rV   rY   r\   r_   rd   re   rh   rn   rw   r   r   r   r   r   r   r   r   r*   rl   r   r   r-   r-   r-   r.   <module>   s    	
=
$
-

!

!

/
('J
(


&



