import { HttpClient } from "@angular/common/http";
import { Injectable, NgZone } from '@angular/core';
import { Observable } from "rxjs";
import { environment } from "../../../environments/environment";

@Injectable({
  providedIn: 'root'
})
export class SseService {

  topics: Map<string, EventSource> = new Map();

  constructor(
    private zone: NgZone,
    private http: HttpClient
  ) {
  }

  /**
   * Subscribe to SSE from endpoint
   * @param eventId - api endpoint
   * @param token
   */
  getMessagesFrom(eventId: string, token?: string): Observable<any> {
    return new Observable(observer => {

      const source = this.getEventSourceFrom(eventId, token);
      this.topics.set(eventId, source);

      source.addEventListener('message', event => {
        this.zone.run(() => {
          observer.next(event.data);
        });
      })

      source.addEventListener('error', event => {
        console.log(event);
        this.zone.run(() => {
          observer.error(event);
        });
      });
    });
  }

  close(eventId: string): void {
    this.topics.get(eventId).close();
    this.topics.delete(eventId);
    this.http.delete(`${environment.baseUrl}/event/${eventId}/close`).subscribe();
  }

  private getEventSourceFrom(eventId: string, token?: string): EventSource {
    return new EventSource(this.getEndpoint(eventId));
  }


  getEndpoint(eventId: string): string {
    return `${environment.baseUrl}/event/sse/${eventId}`;
  }
}
