o
    <i}Y                     @   sf  d dl Z d dlZd dlZd dlmZmZmZmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 e1e2Z3e0G dd deeZ4defddZ5G dd deeZ6G dd dZ7dS )    N)AnyCallableListOptional)HealthCheckHealthCheckPolicy)BackgroundScheduler)	NoBackoff)PubSubWorkerThread)CoreCommandsRedisModuleCommands)MaintNotificationsConfig)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)Database	DatabasesSyncDatabase)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)GeoFailoverReason)Retry)experimentalc                   @   s0  e Zd ZdZdefddZdd Zdd Zd	efd
dZ	de
d	dfddZ	d:dedefddZde
de
fddZdefddZde
defddZdefddZd efd!d"Zd#d$ Zd%d& Zd'ed(gdf fd)d*Zd+d, Zde
d	efd-d.Zd	eeef fd/d0Zd1d2 Z d3e!d4e"d5e"fd6d7Z#d8d9 Z$dS );MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j | _	|j
s!| n|j
| _|jd u r.| n|j| _| j| j |j| _|j| _|j| _| jtf t| j| j| j| j|j|j| j| jd| _d| _t | _t ! | _"|| _#d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)$r"   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvalue_health_check_policyr!   default_failure_detectors_failure_detectorsr$   default_failover_strategy_failover_strategyset_databasesr(   _auto_fallback_intervalr'   _event_dispatcherr#   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r%   r&   command_executorinitializedr   _bg_scheduler	threadingLock_hc_lock_config)selfr     rD   S/root/parts/websockify/install/lib/python3.10/site-packages/redis/multidb/client.py__init__*   sF   






zMultiDBClient.__init__c                 C   $   z|    W d S  ty   Y d S w N)close	ExceptionrC   rD   rD   rE   __del__T   s
   zMultiDBClient.__del__c                 C   sv   | j | j | j | j| j d}| jD ]\}}|j| j	 |jj
tjkr/|s/|| j_d}q|s6tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        FTz4Initial connection failed - no active database foundN)r>   run_coro_sync_perform_initial_health_checkrun_recurring_coror.   _check_databases_healthr)   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr<   _active_databaser   r=   )rC   is_active_db_founddatabaseweightrD   rD   rE   
initialize]   s"   	
zMultiDBClient.initializereturnc                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r)   rK   rD   rD   rE   get_databases   s   zMultiDBClient.get_databasesrY   Nc                 C   s|   d}| j D ]\}}||krd} nq|std| j| j| |jjtjkr:| j 	dd \}}|t
jf| j_dS td)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r)   
ValueErrorr>   rM   _check_db_healthrQ   rT   rU   rV   	get_top_nr   MANUALr<   active_databaser   )rC   rY   existsexisting_db_highest_weighted_dbrD   rD   rE   set_active_database   s$   z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                 C   s"  t dt d|jd< d|jvrtdd|jd< |jr(| jjj|jfi |j}n"|jr@|jt dt d | jjj|jd}n
| jjdi |j}|j	du rS|
 n|j	}t|||j|jd	}z
| j| j| W n tyv   |st Y nw | jd
d \}}| j||j | || dS )z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        r   )retriesbackoffretrymaint_notifications_configF)enabled)connection_poolN)clientrQ   rZ   health_check_urlr_   rD   )r   r	   client_kwargsr   from_urlrB   client_class	from_pool	set_retryrQ   default_circuit_breakerr   rZ   rr   r>   rM   ra   r   r)   rb   add_change_active_database)rC   r    rj   rq   rQ   rY   rh   highest_weightrD   rD   rE   add_database   sH   

zMultiDBClient.add_databasenew_databasehighest_weight_databasec                 C   s4   |j |j kr|jjtjkr|tjf| j_d S d S d S rH   )	rZ   rQ   rT   rU   rV   r   	AUTOMATICr<   rd   )rC   r}   r~   rD   rD   rE   rz      s   z%MultiDBClient._change_active_databasec                 C   sP   | j |}| j dd \}}||kr$|jjtjkr&|tjf| j	_
dS dS dS )z<
        Removes a database from the database list.
        r_   r   N)r)   removerb   rQ   rT   rU   rV   r   rc   r<   rd   )rC   rY   rZ   rh   r{   rD   rD   rE   remove_database   s   zMultiDBClient.remove_databaserZ   c                 C   sh   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| || dS )z<
        Updates a database from the database list.
        NTr^   r_   r   )r)   r`   rb   update_weightrZ   rz   )rC   rY   rZ   re   rf   rg   rh   r{   rD   rD   rE   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r3   append)rC   r   rD   rD   rE   add_failure_detector  s   z"MultiDBClient.add_failure_detectorhealthcheckc                 C   s8   | j  | j| W d   dS 1 sw   Y  dS )z:
        Adds a new health check to the database.
        N)rA   r,   r   )rC   r   rD   rD   rE   add_health_check  s   "zMultiDBClient.add_health_checkc                 O   s    | j s|   | jj|i |S )zB
        Executes a single command and return its result.
        )r=   r[   r<   execute_commandrC   argsoptionsrD   rD   rE   r     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerK   rD   rD   rE   pipeline"  s   zMultiDBClient.pipelinefuncr   c                 O   s&   | j s|   | jj|g||R  S )z3
        Executes callable as transaction.
        )r=   r[   r<   execute_transaction)rC   r   watchesr   rD   rD   rE   transaction(  s   zMultiDBClient.transactionc                 K   s   | j s|   t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r=   r[   PubSub)rC   kwargsrD   rD   rE   pubsub1  s   zMultiDBClient.pubsubc                    s   | j  t| j}W d   n1 sw   Y  | j||I dH }|s3|jjtjkr1tj|j_|S |rA|jjtj	krAtj	|j_|S )zO
        Runs health checks on the given database until first failure.
        N)
rA   listr,   r1   executerQ   rT   rU   OPENrV   )rC   rY   r*   
is_healthyrD   rD   rE   ra   <  s   

zMultiDBClient._check_db_healthc                    s   i  g | _ | jD ]\}}t| |}| |< | j | q	tj| j ddiI dH } fddt| j |D }| D ]\}}t	|t
rZ|j}tj|j_tjd|jd d||< q=|S )	zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsTNc                    s   i | ]	\}} | |qS rD   rD   ).0taskresult
task_to_dbrD   rE   
<dictcomp>_  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>z%Health check failed, due to exception)exc_infoF)	_hc_tasksr)   asynciocreate_taskra   r   gatherzipitems
isinstancer   rY   rU   r   rQ   rT   loggerdebugoriginal_exception)rC   rY   rg   r   results
db_resultsr   unhealthy_dbrD   r   rE   rP   O  s,   



z%MultiDBClient._check_databases_healthc                    s   |   I dH }d}| jjtjkrd| v}n!| jjtjkr,t| t|d k}n| jjtj	kr9d| v }|sDt
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rP   rB   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rC   r   r   rD   rD   rE   rN   q  s    z+MultiDBClient._perform_initial_health_checkrQ   	old_state	new_statec                 C   s   |t jkr| j| j|j d S |t jkr,|t jkr,t	d|j d | j
tt| |t jkrB|t jkrDtd|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rU   	HALF_OPENr>   run_coro_fire_and_forgetra   rY   rV   r   r   warningrun_oncer   _half_open_circuitinfo)rC   rQ   r   r   rD   rD   rE   rS     s   
z/MultiDBClient._on_circuit_state_change_callbackc                 C   sV   | j rz
| j | jj W n	 ty   Y nw | j   | jjr)| jjj  dS dS )z:
        Closes the client and all its resources.
        N)	r>   rM   r1   rI   rJ   stopr<   rd   rq   rK   rD   rD   rE   rI     s   
zMultiDBClient.close)T)%__name__
__module____qualname____doc__r   rF   rL   r[   r   r]   r   ri   r   boolr|   rz   r   r   floatr   r   r   r   r   r   r   r   r   r   ra   dictrP   rN   r   rU   rS   rI   rD   rD   rD   rE   r   #   sJ    *	&
8
		"
r   rQ   c                 C   s   t j| _d S rH   )rU   r   rT   )rQ   rD   rD   rE   r        r   c                   @   s   e Zd ZdZdefddZdddZdd	 Zd
d Zde	fddZ
defddZdddZdddZdddZdd Zdee fddZdS )r   zG
    Pipeline implementation for multiple logical Redis databases.
    rq   c                 C   s   g | _ || _d S rH   )_command_stack_client)rC   rq   rD   rD   rE   rF     s   
zPipeline.__init__r\   c                 C      | S rH   rD   rK   rD   rD   rE   	__enter__     zPipeline.__enter__c                 C      |    d S rH   reset)rC   exc_type	exc_value	tracebackrD   rD   rE   __exit__  r   zPipeline.__exit__c                 C   rG   rH   r   rJ   rK   rD   rD   rE   rL     s
   zPipeline.__del__c                 C   s
   t | jS rH   )r   r   rK   rD   rD   rE   __len__     
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTrD   rK   rD   rD   rE   __bool__  s   zPipeline.__bool__Nc                 C   s
   g | _ d S rH   )r   rK   rD   rD   rE   r     r   zPipeline.resetc                 C   s   |    dS )zClose the pipelineNr   rK   rD   rD   rE   rI        zPipeline.closec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   rD   rD   rE   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   rC   r   r   rD   rD   rE   r     s   zPipeline.execute_commandc                 C   s<   | j js	| j   z| j jt| jW |   S |   w )z0Execute all the commands in the current pipeline)r   r=   r[   r<   execute_pipelinetupler   r   rK   rD   rD   rE   r     s   
zPipeline.execute)r\   r   r\   N)r   r   r   r   r   rF   r   r   rL   intr   r   r   r   rI   r   r   r   r   r   rD   rD   rD   rE   r     s    



r   c                   @   s   e Zd ZdZdefddZd.ddZd/d	d
Zd/ddZd/ddZ	e
defddZdd Zdd Zdd Zdd Zdd Zdd Zdd Z	 d0d!ed"efd#d$Z	 d0d!ed"efd%d&Z	 			d1d'ed(ed)ee d*edd+f
d,d-ZdS )2r   z2
    PubSub object for multi database client.
    rq   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrD   )r   r<   r   )rC   rq   r   rD   rD   rE   rF     s   zPubSub.__init__r\   c                 C   r   rH   rD   rK   rD   rD   rE   r     r   zPubSub.__enter__Nc                 C   rG   rH   r   rK   rD   rD   rE   rL     s
   zPubSub.__del__c                 C   s   | j jdS )Nr   r   r<   execute_pubsub_methodrK   rD   rD   rE   r     s   zPubSub.resetc                 C   r   rH   r   rK   rD   rD   rE   rI     r   zPubSub.closec                 C   s   | j jjjS rH   )r   r<   active_pubsub
subscribedrK   rD   rD   rE   r     r   zPubSub.subscribedc                 G      | j jjdg|R  S )Nr   r   rC   r   rD   rD   rE   r     s
   zPubSub.execute_commandc                 O      | j jjdg|R i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   rD   rD   rE   r   #     zPubSub.psubscribec                 G   r   )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   rD   rD   rE   r   /  
   zPubSub.punsubscribec                 O   r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   rD   rD   rE   r   8  r   zPubSub.subscribec                 G   r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   rD   rD   rE   r   D  s   zPubSub.unsubscribec                 O   r   )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   rD   rD   rE   r   K  r   zPubSub.ssubscribec                 G   r   )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   rD   rD   rE   r   W  r   zPubSub.sunsubscribeF        ignore_subscribe_messagestimeoutc                 C      | j jjd||dS )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   rC   r   r   rD   rD   rE   r   `  
   
zPubSub.get_messagec                 C   r   )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   rD   rD   rE   r   p  r   zPubSub.get_sharded_message
sleep_timedaemonexception_handlersharded_pubsubr
   c                 C   s   | j jj|||| |dS )N)r   r   r   r   )r   r<   execute_pubsub_run)rC   r   r   r   r   rD   rD   rE   run_in_thread  s   zPubSub.run_in_thread)r\   r   r   )Fr   )r   FNF)r   r   r   r   r   rF   r   rL   r   rI   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rD   rD   rD   rE   r     sV    


	
	


r   )8r   loggingr?   typingr   r   r   r   !redis.asyncio.multidb.healthcheckr   r   redis.backgroundr   redis.backoffr	   redis.clientr
   redis.commandsr   r   redis.maint_notificationsr   redis.multidb.circuitr   r   rU   redis.multidb.command_executorr   redis.multidb.configr   r   r   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   r   redis.multidb.failure_detectorr   redis.observability.attributesr   redis.retryr   redis.utilsr   	getLoggerr   r   r   r   r   r   rD   rD   rD   rE   <module>   s:    
   C