o
    <iS                     @   sn  d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3 e4e5Z6e3G dd de#e"Z7de%fddZ8G dd de#e"Z9G dd dZ:dS )    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)GeoFailoverReason)ChannelT
EncodableTKeyT)experimentalc                   @   sh  e Zd ZdZdefddZdCddZd	d
 Zdd Zdd Z	de
fddZdeddfddZ	dDdedefddZdedefddZdefddZdedefd d!Zd"efd#d$Zd%efd&d'Zd(d) Zd*d+ Zdd,dd-d.ed/geeee f f d0ed1e e! d2ed3e e f
d4d5Z"d6d7 Z#de$e%ef fd8d9Z&d:d; Z'dedefd<d=Z(d>e)d?e*d@e*fdAdBZ+dS )EMultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j | _	|j
s!| n|j
| _|jd u r.| n|j| _| j| j |j| _|j| _|j| _| jtg t| j| j| j| j|j|j| j| jd| _d| _t | _ t! | _"|| _#d | _$g | _%d | _&d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)'r&   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvalue_health_check_policyr%   default_failure_detectors_failure_detectorsr(   default_failover_strategy_failover_strategyset_databasesr,   _auto_fallback_intervalr+   _event_dispatcherr'   _command_retryupdate_supported_errorsConnectionRefusedErrorr	   r)   r*   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr$    rK   [/root/parts/websockify/install/lib/python3.10/site-packages/redis/asyncio/multidb/client.py__init__)   sL   






zMultiDBClient.__init__rJ   returnc                    s   | j s|  I d H  | S N)rA   
initializerJ   rK   rK   rL   
__aenter__V   s   zMultiDBClient.__aenter__c                    sj   | j r	| j   | jr| j  | jD ]}|  q| j I d H  | jjr3| jjj	 I d H  d S d S rO   )
rG   cancelrI   rH   r5   closer@   active_databaseclientaclose)rJ   hc_taskrK   rK   rL   rW   [   s   



zMultiDBClient.aclosec                       |   I d H  d S rO   rW   rJ   exc_type	exc_value	tracebackrK   rK   rL   	__aexit__k      zMultiDBClient.__aexit__c                    s   |   I dH  t| j| j| j| _d}| jD ]\}}|j	
| j |j	jtjkr4|s4|| j_d}q|s;tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkrB   create_taskrE   run_recurring_asyncr2   _check_databases_healthrG   r-   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr@   _active_databaser   rA   )rJ   is_active_db_founddatabaseweightrK   rK   rL   rP   n   s(   
zMultiDBClient.initializec                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r-   rQ   rK   rK   rL   get_databases   s   zMultiDBClient.get_databasesrm   Nc                    s   d}| j D ]\}}||krd} nq|std| |I dH  |jjtjkr?| j dd \}}| j	|t
jI dH  dS td)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r-   
ValueError_check_db_healthre   rh   ri   rj   	get_top_nr@   set_active_databaser   MANUALr   )rJ   rm   existsexisting_db_highest_weighted_dbrK   rK   rL   ru      s&   
z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                    s  |j dtdt di |jr| jjj|jfi |j }n"|jr7|jtdt d | jjj|jd}n
| jjdi |j }|j	du rJ|
 n|j	}t|||j|jd}z
| |I dH  W n tym   |sk Y nw | jdd \}}| j||j | ||I dH  dS )	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)connection_poolN)rV   re   rn   health_check_urlrq   rK   )client_kwargsupdater   r   from_urlrF   client_class	from_pool	set_retryre   default_circuit_breakerr   rn   r   rs   r   r-   rt   add_change_active_database)rJ   r$   r{   rV   re   rm   rz   highest_weightrK   rK   rL   add_database   sD   
zMultiDBClient.add_databasenew_databasehighest_weight_databasec                    s>   |j |j kr|jjtjkr| j|tjI d H  d S d S d S rO   )	rn   re   rh   ri   rj   r@   ru   r   	AUTOMATIC)rJ   r   r   rK   rK   rL   r      s   z%MultiDBClient._change_active_databasec                    sZ   | j |}| j dd \}}||kr)|jjtjkr+| j|t	j
I dH  dS dS dS )z<
        Removes a database from the database list.
        rq   r   N)r-   removert   re   rh   ri   rj   r@   ru   r   rv   )rJ   rm   rn   rz   r   rK   rK   rL   remove_database   s   zMultiDBClient.remove_databasern   c                    sp   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| ||I dH  dS )z<
        Updates a database from the database list.
        NTrp   rq   r   )r-   rr   rt   update_weightrn   r   )rJ   rm   rn   rw   rx   ry   rz   r   rK   rK   rL   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r7   append)rJ   r   rK   rK   rL   add_failure_detector  s   z"MultiDBClient.add_failure_detectorhealthcheckc              	      sN   | j 4 I dH  | j| W d  I dH  dS 1 I dH s w   Y  dS )z:
        Adds a new health check to the database.
        N)rD   r0   r   )rJ   r   rK   rK   rL   add_health_check  s   .zMultiDBClient.add_health_checkc                    s.   | j s|  I dH  | jj|i |I dH S )zB
        Executes a single command and return its result.
        N)rA   rP   r@   execute_commandrJ   argsoptionsrK   rK   rL   r     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerQ   rK   rK   rL   pipeline'  s   zMultiDBClient.pipelineF
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   s:   | j s|  I dH  | jj|g|R |||dI dH S )z3
        Executes callable as transaction.
        Nr   )rA   rP   r@   execute_transaction)rJ   r   r   r   r   r   rK   rK   rL   transaction-  s   zMultiDBClient.transactionc                    s&   | j s|  I dH  t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)rA   rP   PubSub)rJ   kwargsrK   rK   rL   pubsubC  s   zMultiDBClient.pubsubc                    s   i  g | _ | jD ]\}}t| |}| |< | j | q	tj| j ddiI dH } fddt| j |D }| D ]\}}t	|t
rZ|j}tj|j_tjd|jd d||< q=|S )	zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsTNc                    s   i | ]	\}} | |qS rK   rK   ).0taskresult
task_to_dbrK   rL   
<dictcomp>^  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>z%Health check failed, due to exception)exc_infoF)rH   r-   rB   rb   rs   r   gatherzipitems
isinstancer   rm   ri   OPENre   rh   loggerdebugoriginal_exception)rJ   rm   ry   r   results
db_resultsr   unhealthy_dbrK   r   rL   rd   N  s,   



z%MultiDBClient._check_databases_healthc                    s   |   I dH }d}| jjtjkrd| v}n!| jjtjkr,t| t|d k}n| jjtj	kr9d| v }|sDt
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rd   rF   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rJ   r   
is_healthyrK   rK   rL   ra   p  s    z+MultiDBClient._perform_initial_health_checkc                    sX   | j | j|I dH }|s|jjtjkrtj|j_|S |r*|jjtjkr*tj|j_|S )zO
        Runs health checks on the given database until first failure.
        N)r5   executer0   re   rh   ri   r   rj   )rJ   rm   r   rK   rK   rL   rs     s   


zMultiDBClient._check_db_healthre   	old_state	new_statec                 C   s   t  }|tjkrt | |j| _d S |tjkr0|tj	kr0t
d|j d |tt| |tjkrF|tjkrHt
d|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rB   get_running_loopri   	HALF_OPENrb   rs   rm   rI   rj   r   r   warning
call_laterr
   _half_open_circuitinfo)rJ   re   r   r   looprK   rK   rL   rg     s   

z/MultiDBClient._on_circuit_state_change_callback)rJ   r#   rN   r#   )T),__name__
__module____qualname____doc__r   rM   rR   rW   r_   rP   r   ro   r   ru   r   boolr   r   r   floatr   r   r   r   r   r   r   r   r   r   r   r!   r   strr   r   dictr   rd   ra   rs   r   ri   rg   rK   rK   rK   rL   r#   "   sf    
-$
1
	

"r#   re   c                 C   s   t j| _d S rO   )ri   r   rh   )re   rK   rK   rL   r        r   c                   @   s   e Zd ZdZdefddZdddZd	d
 Zdd Zdd Z	de
fddZdefddZdddZdddZd ddZdd Zdee fddZdS )!r   zG
    Pipeline implementation for multiple logical Redis databases.
    rV   c                 C   s   g | _ || _d S rO   )_command_stack_client)rJ   rV   rK   rK   rL   rM     s   
zPipeline.__init__rJ   rN   c                       | S rO   rK   rQ   rK   rK   rL   rR        zPipeline.__aenter__c                    s*   |   I d H  | j|||I d H  d S rO   )resetr   r_   r[   rK   rK   rL   r_     s   zPipeline.__aexit__c                 C   s   |    S rO   )_async_self	__await__rQ   rK   rK   rL   r     r   zPipeline.__await__c                    r   rO   rK   rQ   rK   rK   rL   r     r   zPipeline._async_selfc                 C   s
   t | jS rO   )r   r   rQ   rK   rK   rL   __len__  s   
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTrK   rQ   rK   rK   rL   __bool__  s   zPipeline.__bool__Nc                    s   g | _ d S rO   )r   rQ   rK   rK   rL   r     s   
zPipeline.resetc                    s   |   I dH  dS )zClose the pipelineN)r   rQ   rK   rK   rL   rW     s   zPipeline.aclosec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   rK   rK   rL   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   rJ   r   r   rK   rK   rL   r     s   zPipeline.execute_commandc                    sV   | j js| j  I dH  z| j jt| jI dH W |  I dH  S |  I dH  w )z0Execute all the commands in the current pipelineN)r   rA   rP   r@   execute_pipelinetupler   r   rQ   rK   rK   rL   r     s   
 zPipeline.execute)rJ   r   rN   r   rN   N)rN   r   )r   r   r   r   r#   rM   rR   r_   r   r   intr   r   r   r   rW   r   r   r   r   r   rK   rK   rK   rL   r     s    



r   c                   @   s   e Zd ZdZdefddZd&ddZd'd	d
Zdd Ze	de
fddZdefddZdedefddZdefddZdedefddZdd Z	d(de
dee fdd Zdd!d"d#eddfd$d%ZdS ))r   z2
    PubSub object for multi database client.
    rV   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrK   )r   r@   r   )rJ   rV   r   rK   rK   rL   rM     s   zPubSub.__init__rN   c                    r   rO   rK   rQ   rK   rK   rL   rR     r   zPubSub.__aenter__Nc                    rY   rO   rZ   r[   rK   rK   rL   r_   
  r`   zPubSub.__aexit__c                    s   | j jdI d H S )NrW   r   r@   execute_pubsub_methodrQ   rK   rK   rL   rW     s   zPubSub.aclosec                 C   s   | j jjjS rO   )r   r@   active_pubsub
subscribedrQ   rK   rK   rL   r     s   zPubSub.subscribedr   c                    s   | j jjdg|R  I d H S )Nr   r   rJ   r   rK   rK   rL   r     s   zPubSub.execute_commandr   c                    $   | j jjdg|R i |I dH S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr   r   rK   rK   rL   r        zPubSub.psubscribec                       | j jjdg|R  I dH S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr   r   rK   rK   rL   r   %     zPubSub.punsubscribec                    r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr   r   rK   rK   rL   r   .  r   zPubSub.subscribec                    r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr   r   rK   rK   rL   r   :  r   zPubSub.unsubscribeF        ignore_subscribe_messagestimeoutc                    s   | j jjd||dI dH S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r   r   Nr   )rJ   r   r   rK   rK   rL   r   C  s   
zPubSub.get_messageg      ?)exception_handlerpoll_timeoutr   c                   s   | j jj||| dI dH S )a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer   r   N)r   r@   execute_pubsub_run)rJ   r   r   rK   rK   rL   runS  s   z
PubSub.run)rN   r   r   )Fr   )r   r   r   r   r#   rM   rR   r_   rW   propertyr   r   r    r   r   r   r   r   r   r   r   r   r   r   r   rK   rK   rK   rL   r     s4    

	

r   );rB   loggingtypingr   r   r   r   r   r   redis.asyncio.clientr   &redis.asyncio.multidb.command_executorr	   redis.asyncio.multidb.configr
   r   r   r   redis.asyncio.multidb.databaser   r   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.asyncio.retryr   redis.backgroundr   redis.backoffr   redis.commandsr   r   redis.multidb.circuitr   r   ri   redis.multidb.exceptionr   r   r   redis.observability.attributesr   redis.typingr   r    r!   redis.utilsr"   	getLoggerr   r   r#   r   r   r   rK   rK   rK   rL   <module>   s8     
   D