
    !+ik>                         d Z ddlZddlZddlZddlmZ ddlmZ  ej                  e      Z	 G d d      Z
dad Zd Zy)	z
Webhook Processor - Handles queued webhook events asynchronously.
Fetches order details from ML API and updates the database.
    N)Empty)datetimec                   f    e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd ZddefdZd Zy)WebhookProcessorzI
    Background worker that processes webhook events from the queue.
    c                     || _         || _        || _        d| _        d| _        d| _        d| _        d| _        g | _        y)z
        Args:
            webhook_queue: Queue instance with webhook events
            db_session_factory: Callable that returns a new DB session
            meli_service_factory: Callable that returns a MeliApiService instance
        FNr   )	queuedb_session_factorymeli_service_factoryrunningthreadprocessed_counterror_countlast_processed_atsubscribers)selfwebhook_queuer	   r
   s       L/var/www/hypershopcomercio.com.br/hyper-ai/app/services/webhook_processor.py__init__zWebhookProcessor.__init__   sK     #
"4$8! !%     c                     | j                   rt        j                  d       yd| _         t        j                  | j
                  d      | _        | j                  j                          t        j                  d       y)z&Start the background processor thread.z#[WEBHOOK_PROCESSOR] Already runningNT)targetdaemonz[WEBHOOK_PROCESSOR] Started)	r   loggerwarning	threadingThread_process_loopr   startinfor   s    r   r   zWebhookProcessor.start&   sW    <<NN@A&&d.@.@N12r   c                     d| _         | j                  r| j                  j                  d       t        j	                  d       y)zStop the background processor.F   timeoutz[WEBHOOK_PROCESSOR] StoppedN)r   r   joinr   r   r    s    r   stopzWebhookProcessor.stop1   s2    ;;KKQ'12r   c                 X   | j                   r<	 | j                  j                  d      }| j                  |       | j                   r;yy# t        $ r Y Tt
        $ rL}t        j                  d|        | xj                  dz  c_        t        j                  d       Y d}~id}~ww xY w)zMain processing loop.   r#   z#[WEBHOOK_PROCESSOR] Error in loop: N)r   r   get_process_eventr   	Exceptionr   errorr   timesleep)r   eventes      r   r   zWebhookProcessor._process_loop8   s    ll


q1##E*	 ll
   B1#FG  A% 

1s   -A
 
	B)B)AB$$B)c                    |j                  d      }|j                  dd      }t        j                  d| d|        	 |dk(  r| j                  |       nF|dk(  r| j	                  |       n/|dk(  r| j                  |       nt        j                  d	|        | xj                  d
z  c_        t        j                         | _	        	 ddl
m}  |d||| j                  j                         d       	 | j                          | j!                  d||| j                  j                         d       y# t        $ r"}t        j                  d|        Y d}~fd}~ww xY w# t        $ r"}t        j                  d|        Y d}~d}~ww xY w# t        $ r:}t        j#                  d| d|        | xj$                  d
z  c_        Y d}~yd}~ww xY w)zProcess a single webhook event.topicresource z [WEBHOOK_PROCESSOR] Processing: z -> 	orders_v2itemspaymentsz#[WEBHOOK_PROCESSOR] Ignored topic: r(   r   broadcast_eventwebhook_processed)r2   r3   	timestampz*[WEBHOOK_PROCESSOR] SSE broadcast failed: Nz([WEBHOOK_PROCESSOR] Visits sync failed: )typer2   r3   r;   z%[WEBHOOK_PROCESSOR] Error processing : )r)   r   r   _handle_order_event_handle_item_event_handle_payment_eventr   r   utcnowr   app.api.endpoints.sser9   	isoformatr+   r   _sync_visits_for_update_notify_subscribersr,   r   )r   r/   r2   r3   r9   sse_err
visits_errr0   s           r   r*   zWebhookProcessor._process_eventG   s   		'"99Z,6ugT(LM(	"#((2'!''1*$**84A%IJ  A% %-__%6D"WA 3" (!%!7!7!A!A!C6 X,,.
 $$+$!33==?	&   W!KG9UVVW  X!I*VWWX  	"LL@r!MN!	"sg    BF +D7 8E% .F 7	E" EF E""F %	F.FF FF 	G0GGc                    |j                  dd      j                         }|st        j                  d       yt        j	                  d|        | j                         }| j                  |      }	 |j                  |      }|rddlm	}  |||      }|j                  |      }|j                          t        j	                  d	| d
| d       |dk(  r	 ddlm}	 |j                  dg       }
|
r|
d   ni }|j                  di       j                  dd      } |	d|||t        j                          j#                         d       t        j	                  d|        nt        j	                  d	| d       	 |j                  dg       }
|
D cg c]E  }|j                  di       j                  d      s%|j                  di       j                  d      G }}|r*t        j	                  d|        | j'                  |       nt        j                  d	| d       |j+                          y# t$        $ r"}t        j                  d|        Y d}~d}~ww xY wc c}w # t$        $ r%}t        j                  d| d|        Y d}~md}~ww xY w# t$        $ r!}t        j)                  d| d|         d}~ww xY w# |j+                          w xY w)zU
        Handle orders_v2 topic.
        Resource format: /orders/{order_id}
        z/orders/r4   z*[WEBHOOK_PROCESSOR] Invalid order resourceNz$[WEBHOOK_PROCESSOR] Fetching order: r   )InitialLoadService)meli_clientz[WEBHOOK_PROCESSOR] Order z synced successfully ()createdr8   order_itemsitemtitlezNovo Pedidoorder_update)order_idrO   statusr;   u.   [WEBHOOK_PROCESSOR] 🎉 SSE NOVA VENDA sent: z-[WEBHOOK_PROCESSOR] SSE order_update failed: z was UPDATE, no celebrationidz;[WEBHOOK_PROCESSOR] Triggering visits sync for sold items: item_idsz>[WEBHOOK_PROCESSOR] Failed to trigger visits update for order r=   z not found in APIz([WEBHOOK_PROCESSOR] Error syncing order )replacestripr   r   r   r	   r
   	get_order!app.services.sync_v2.initial_loadrI   _upsert_ordercommitrB   r9   r)   r   rA   rC   r+   rD   r,   close)r   r3   rQ   dbmeli
order_datarI   loaderrR   r9   rM   
first_itemproduct_titlerF   rN   rU   v_errr0   s                     r   r>   z$WebhookProcessor._handle_order_eventx   s    ##J399;NNGH:8*EF $$&((,6	1JP+BDA  --j9		8
BXY_X``abc Y&bI&0nn]B&G7B[^
(2vr(B(F(FwP](^'(0%2&,)1):)D)D)F	9  &TUbTc$de KK"<XJFa bc	z",.."CKKVy4Z^ZbZbcikmZnZrZrswZx 4 8 8 >yHy'bckbl%mn 55x5H !;H:EVWX HHJ/ % b)VW^V_'`aab  z ! z^^&demdnnpqvpw$xyyz
  	LLCH:RPQsST	 HHJs   3A&J B	H7 #J >I* &I%;#I%.I* J 7	I" IJ I""J %I* *	J3JJ JJ 	K$K  KK Kc                    |j                  dd      j                         }|syt        j                  d|        | j	                         }| j                  |      }	 |j                  dd|       }|r~ddlm}  |       }|j                  |       |j                  j                          |j                  j                          t        j                  d| d	       | j                  |g
       |j                          y# t        $ r%}t        j                  d| d|        Y d}~:d}~ww xY w# |j                          w xY w)zO
        Handle items topic.
        Resource format: /items/{item_id}
        /items/r4   Nz![WEBHOOK_PROCESSOR] Item update: GETr   )
SyncEnginez[WEBHOOK_PROCESSOR] Item z syncedrT   z'[WEBHOOK_PROCESSOR] Error syncing item r=   )rV   rW   r   r   r	   r
   requestapp.services.sync_enginerg   
_upsert_adr]   r[   r\   rD   r+   r,   )	r   r3   item_idr]   r^   	item_datarg   enginer0   s	            r   r?   z#WebhookProcessor._handle_item_event   s!   
 ""9b17797yAB $$&((,	UggY,?@I?#!!),		  "		!7yHI ,,wi,@
 HHJ  	SLLB7)2aSQRR	S HHJs+   BD 	D2D-(D5 -D22D5 5Ec                 4    t         j                  d|        y)zu
        Handle payments topic.
        Resource format: /collections/{payment_id} or /payments/{payment_id}
        z$[WEBHOOK_PROCESSOR] Payment update: N)r   r   )r   r3   s     r   r@   z&WebhookProcessor._handle_payment_event   s     	:8*EFr   c                     | j                   dd D ]  }	  ||        y# t        $ r | j                   j                  |       Y 5w xY w)z(Notify all SSE subscribers of an update.N)r   r+   remove)r   data
subscribers      r   rE   z$WebhookProcessor._notify_subscribers   sN    **1- 	4J44 	4  4  ''
34s   $AAc                 :    | j                   j                  |       y)zAdd an SSE subscriber.N)r   appendr   callbacks     r   add_subscriberzWebhookProcessor.add_subscriber   s    )r   c                 X    || j                   v r| j                   j                  |       yy)zRemove an SSE subscriber.N)r   rp   ru   s     r   remove_subscriberz"WebhookProcessor.remove_subscriber   s)    t'''##H- (r   NrU   c           
      ~   ddl m}m} ddlm} ddlm} ddl}t        j                  d| d       | j                         }| j                  |      }	 |j                  d      }	g }
|rG|j                  |      j                  |j                  j!                  |            j#                         }
d	}|j                  |      j                  |j$                  d
k(  |j                  j'                  |
D cg c]  }|j                   c}            j)                  |j*                  j-                               j/                  |      j#                         }|
j1                  |       t        j                  dt3        |
       d|rt3        |      nd dt3        |       d       |j5                         }| |d      z
  }|
D ]/  }	 |j7                  dd|j                   dddd      }|rt9        |t:              rd}|D ]  }|j=                  d      }|s|j?                  |dd       }|j=                  dd      }||z  }|j                  |      j                  |j@                  |j                  k(  |j                  |k(        jC                         }|r||_"         ||j                  ||d      }|jG                  |        ||_$        ||_%        t        jL                         |_        2 |jS                          t        j                  d       |jY                          yc c}w # tN        $ r0}t        jQ                  d|j                   d|        Y d}~d}~ww xY w# tN        $ r2}t        jU                  d|        |jW                          Y d}~d}~ww xY w# |jY                          w xY w) z
        Sync visits data from ML API to update real-time metrics.
        If item_ids provided, syncs only those items.
        Otherwise, syncs a batch of active items.
        r   )date	timedelta)MlMetricsDaily)AdNzI[WEBHOOK_PROCESSOR] Syncing visits for real-time update (Specific Items: z)...MELI_USER_ID2   activez'[WEBHOOK_PROCESSOR] Syncing visits for z items (Specific: z, Background: rK   r(   )daysrf   re   z/visits/time_window   day)lastunit)paramsr{   
   total)rk   r{   visits	sales_qtyz*[WEBHOOK_PROCESSOR] Visits sync error for r=   z)[WEBHOOK_PROCESSOR] Visits sync completedz'[WEBHOOK_PROCESSOR] Visits sync error: )-r   r{   r|   app.models.ml_metrics_dailyr}   app.models.adr~   osr   r   r	   r
   getenvqueryfilterrS   in_allrR   notin_order_byvisits_last_updatedasclimitextendlentodayrh   
isinstancelistr)   fromisoformatrk   firstr   add
visits_30dtotal_visitsnowr+   debugr[   r,   rollbackr\   )r   rU   r{   r|   r}   r~   r   r]   r^   user_idr6   background_limitibackground_itemsr   	yesterdayrN   visits_datatotal_visits_sumvisit_entry
visit_datevisit_date_objr   existing
new_metricitem_errr0   s                              r   rD   z(WebhookProcessor._sync_visits_for_update   s~    	->$_`h_iimno$$&((,R	ii/G E ++BEEIIh,?@DDF  "!xx|22		X%E2qadd23  hr--1134UU;K5LSSU 
 LL)*KKA#e*M_qy`cdl`m  @A  `B  BP  QT  Ue  Qf  Pg  gh  i  jJJLE	q 11I +*"&,,!$''*=>()59 #/ #K #z+t'D+,(+6 7K)4)@J)151C1CJsPRO1T/:w/J 0L @ 0 ,.88N+C+J+J$2$:$:dgg$E$2$7$7>$I," #(%' !)
 $,6BHO1?04-;/;23	2&J %'FF:$6-74 +;,<)3;<<>0O+Z IIKKKCD HHJ} 3f ! LL#MdggYVXYaXb!cd  	LLB1#FGKKMM	 HHJsp   BM, 3L+C M, AL0CL02(M, +M, 0	M)9%M$M, $M))M, ,	N'5(N"N* "N''N* *N<c                     | j                   | j                  j                         | j                  | j                  | j
                  r| j
                  j                         dS ddS )zGet processor status.N)r   
queue_sizer   r   r   )r   r   qsizer   r   r   rC   r    s    r   
get_statuszWebhookProcessor.get_statusc  sb     ||****,#33++GKG]G]!7!7!A!A!C
 	

 dh
 	
r   )N)__name__
__module____qualname____doc__r   r   r&   r   r*   r>   r?   r@   rE   rw   ry   r   rD   r    r   r   r   r      sW    &	33/"bGR!FG4*.
b bH
r   r   c                      t         S )z*Get the global webhook processor instance.)
_processorr   r   r   get_processorr   r  s    r   c                 (    t        | ||      at        S )z(Initialize the global webhook processor.)r   r   )r   r	   r
   s      r   init_processorr   w  s     "-1CEYZJr   )r   loggingr   r-   r   r   r   	getLoggerr   r   r   r   r   r   r   r   r   <module>r      sK        			8	$]
 ]
B 

r   