import { isBoolean, isEqual } from 'lodash-es';
import {
    MonoTypeOperatorFunction,
    Observable,
    TimeoutError,
    combineLatest,
    concat,
    defer,
    merge,
    of,
    throwError
} from 'rxjs';
import {
    catchError,
    debounceTime,
    delay,
    distinctUntilChanged,
    filter,
    finalize,
    first,
    map,
    startWith,
    switchMap,
    switchMapTo,
    take,
    tap,
    timeout,
    toArray,
    withLatestFrom
} from 'rxjs/operators';

import { Inject, Injectable } from '@angular/core';
import { translate } from '@jsverse/transloco';
import {
    AmlUiSharedState,
    One2OneSessionState
} from '@mhp/aml-ui-shared-services';
import { IllegalStateError, lazyShareReplay } from '@mhp/common';
import { UiMatDialogConfig } from '@mhp/ui-components';
import {
    ApplicationStateService,
    CommonDialogsService,
    DialogButtonPlacement,
    DialogButtonType,
    ENGINE_CONTROL_STRATEGY_PROVIDER_TOKEN,
    EngineConnectionStateService,
    EngineControlSocketStrategy,
    EngineControlStrategy,
    EngineStatePatchService,
    ErrorHandlerService,
    L10nService,
    StreamConnectionCutError,
    StreamRendererUnavailableError,
    StreamRendererUnavailableQuotaExceededError,
    StreamSessionNotFoundError,
    UI_SHARED_STATE_STRATEGY_PROVIDER_TOKEN,
    UiSharedStateService,
    UiSharedStateSocketStrategy,
    UiSharedStateStrategy,
    UpdatableStrategyProvider,
    gtmGA4Track,
    selectConfigurationState
} from '@mhp/ui-shared-services';

import { environment } from '../../../environments/environment';
import {
    clearActiveLoadingStateIfInContext,
    setActiveLoadingState
} from '../../common/loading-indicator/state';
import { selectOne2OneState } from '../../dealer/state/selectors/dealer-state.selectors';
import { LocalApplicationState } from '../../state/index';
import { ConfigurationSessionInfoService } from '../session-info/configuration-session-info.service';
import {
    VisualizationMode,
    selectTransitionSilentlyToTargetVisualizationMode,
    setTransitionSilentlyToTargetVisualizationMode
} from '../state/index';
import { StreamHandlerService } from '../stream/stream-handler.service';
import { VisualizationModeService } from '../visualization-mode/visualization-mode.service';
import { EngineDisconnectError } from './errors/engine-disconnect.error';
import { NoUserAckForSwitchToStreamError } from './errors/no-user-ack-for-switch-to-stream.error';

interface RetryDecision {
    // it should be retried immediately
    tryAgain: boolean;
    // should we try to silently upgrade to 3D in the background?
    tryUpgradeToStream: boolean;
    // should the choice about trying again should be treated as a user-choice and persisted
    updateVizModeChoice: boolean;
}

const STREAM_LOADING_CONTEXT = 'STREAM_LOADING';
const NOTIFICATION_DIALOG_WIDTH = '500px';

const DEFAULT_DIALOG_OPTIONS: UiMatDialogConfig = {
    width: NOTIFICATION_DIALOG_WIDTH,
    panelClass: 'mhp-ui-modal-panel--stretch-buttons'
};

/**
 * Service that handles activation and deactivation of stream-visualization mode related
 * logic.
 * Listens to changes in the target-visualization mode.
 * When stream-visualization is set as target, then:
 * - try to get hold of a streaming session
 * - starts listening to engine-connected events
 * - when engine is connected, apply local state to connected engine
 * - listen to engine-patches
 *
 * As long as there is no engine connected, communicate basic-visualization mode as active mode.
 *
 * When basic-visualization is set as target, then:
 * - stop the stream
 * - stop engine connection-checks
 * - stop listen to engine-patches
 */
@Injectable()
export class ConfigurationStreamActivationService {
    constructor(
        private readonly applicationStateService: ApplicationStateService<LocalApplicationState>,
        private readonly visualizationModeService: VisualizationModeService,
        private readonly engineStatePatchService: EngineStatePatchService,
        private readonly uiSharedStateService: UiSharedStateService<AmlUiSharedState>,
        private readonly engineConnectionStateService: EngineConnectionStateService,
        private readonly streamHandlerService: StreamHandlerService,
        private readonly errorHandlerService: ErrorHandlerService,
        private readonly commonDialogService: CommonDialogsService,
        private readonly l10nService: L10nService,
        private readonly configurationSessionInfoService: ConfigurationSessionInfoService,

        @Inject(ENGINE_CONTROL_STRATEGY_PROVIDER_TOKEN)
        private readonly engineControlStrategyProvider: UpdatableStrategyProvider<EngineControlStrategy>,
        private readonly engineControlSocketStrategy: EngineControlSocketStrategy,
        @Inject(UI_SHARED_STATE_STRATEGY_PROVIDER_TOKEN)
        private readonly uiSharedStateStrategyProvider: UpdatableStrategyProvider<
            UiSharedStateStrategy<AmlUiSharedState>
        >,
        private readonly uiSharedStateSocketStrategy: UiSharedStateSocketStrategy<AmlUiSharedState>
    ) {
        this.startDeterminingActiveVisualizationMode();
        this.initOne2OneSessionLogic();
    }

    private startDeterminingActiveVisualizationMode() {
        this.visualizationModeService
            .getTargetVisualizationMode$()
            .pipe(this.handleTargetVisualizationModeSwitch$())
            .subscribe(
                (activeVisualizationMode) => {
                    this.visualizationModeService.setActiveVisualizationMode(
                        activeVisualizationMode
                    );
                },
                (error) => {
                    console.error(
                        'Failed determining active visualization mode',
                        error
                    );
                }
            );
    }

    /**
     * Handle the switch of the target visualization mode and emit the active visualization mode afterwards,
     * depending on if it could be activated or not.
     * @private
     */
    private handleTargetVisualizationModeSwitch$(): MonoTypeOperatorFunction<VisualizationMode> {
        return (visualizationMode$) =>
            <Observable<VisualizationMode>>combineLatest([
                visualizationMode$,
                this.applicationStateService.getLocalState().pipe(
                    selectOne2OneState,
                    map(
                        (one2oneState) =>
                            one2oneState?.targetSessionData?.mainConnection
                                .connectionKey
                    )
                )
            ]).pipe(
                debounceTime(0),
                distinctUntilChanged(isEqual),
                switchMap(([visualizationMode, sessionKey]) => {
                    if (visualizationMode !== VisualizationMode.STREAM) {
                        return of(visualizationMode);
                    }

                    /**
                     * What happens here:
                     * - the task is to get a stream activated and to setup the application to communicate to the underlying engine when successfully connected
                     * - in case of an error, do not retry but change the active mode to 2D and show a modal to the user, indicating the type of error and optionally allow the user to
                     *   keep trying reconnecting silently in the background while working in 2D and getting notified in case an instance is available.
                     */

                    // Now, we should try to connect to the streaming backend
                    const sharedStreamErrors$ = this.streamHandlerService
                        .getStreamErrors$()
                        .pipe(lazyShareReplay());

                    return this.applicationStateService.getLocalState().pipe(
                        selectTransitionSilentlyToTargetVisualizationMode,
                        first(),
                        tap((isPollingInBackground) => {
                            if (!isPollingInBackground) {
                                // indicate that we are trying to load the stream only when not polling in background
                                this.indicateLoadingStreamStatus();
                            }
                        }),
                        map(() => undefined),
                        switchMap(() =>
                            // actually get hold of a stream and error-out in case an error is emitted
                            combineLatest([
                                this.streamHandlerService.startStream$({
                                    sessionCode: sessionKey,
                                    useAudio: false
                                }),
                                sharedStreamErrors$.pipe(
                                    startWith(undefined),
                                    tap((error) => {
                                        if (!error) {
                                            return;
                                        }
                                        throw error.error;
                                    })
                                )
                            ]).pipe(map(() => undefined))
                        ),
                        switchMap(
                            () =>
                                // we have a valid connection to a session. Start checking if we can connect
                                new Observable((subscriber) => {
                                    // start connection-checks to see when engine responds
                                    this.engineConnectionStateService.startConnectionChecks();
                                    subscriber.next();

                                    return () => {
                                        // stop connection checks when observable is unsubscribed
                                        this.engineConnectionStateService.stopConnectionCheck();
                                    };
                                })
                        ),
                        switchMapTo(
                            this.engineConnectionStateService
                                .getEngineConnectionState$()
                                // first, wait x seconds for the engine to respond
                                .pipe(
                                    filter(
                                        (engineConnected) => !!engineConnected
                                    ),
                                    take(1),
                                    timeout(
                                        environment.appConfig.stream
                                            .waitForInitialEngineResponseAfterConnectTimeout
                                    ),
                                    catchError((error) =>
                                        throwError(
                                            new EngineDisconnectError(
                                                'Failed to initially connect to the engine',
                                                error
                                            )
                                        )
                                    )
                                )
                                // then, upon initial successful connect, observe changes in engine-connection state. In case, connection to engine is lost, throw an error
                                .pipe(
                                    switchMapTo(
                                        this.engineConnectionStateService
                                            .getEngineConnectionState$()
                                            .pipe(
                                                tap((engineConnected) => {
                                                    if (engineConnected) {
                                                        gtmGA4Track(
                                                            'stream_engine_active'
                                                        );
                                                        return;
                                                    }
                                                    throw new EngineDisconnectError(
                                                        'Engine disconnected'
                                                    );
                                                })
                                            )
                                    ),
                                    map(() => undefined)
                                )
                        ),
                        switchMap(() => {
                            const dontWaitForAck$ = of(undefined);

                            /* when we are to transition silently to the stream and we have a connection to an instance ready.
                             * So
                             * - set the candidate visualization mode to STREAM
                             * - wait for an external acknowledgement for X ms before timing out to not block forever here
                             */
                            const waitForAck$ = defer(() => {
                                // we have a valid connection to a session and are successfully connected to the engine.
                                this.visualizationModeService.setCandidateVisualizationMode(
                                    VisualizationMode.STREAM
                                );

                                // Check if we should wait for a user-ack before finally switching over to the stream
                                return this.visualizationModeService
                                    .getCandidateVisualizationModeAck$()
                                    .pipe(
                                        filter(
                                            (
                                                isAcknowledged
                                            ): isAcknowledged is boolean =>
                                                isBoolean(isAcknowledged)
                                        ),
                                        take(1),
                                        tap((isAcknowledged) => {
                                            if (!isAcknowledged) {
                                                // user declined offer, track it
                                                gtmGA4Track(
                                                    'stream_available_offer',
                                                    {
                                                        stream_available_response:
                                                            'decline'
                                                    }
                                                );
                                                //
                                                throw new NoUserAckForSwitchToStreamError(
                                                    'User declined offer to switch over to stream'
                                                );
                                            }

                                            // user accepted offer, track it
                                            gtmGA4Track(
                                                'stream_available_offer',
                                                {
                                                    stream_available_response:
                                                        'accept'
                                                }
                                            );
                                        }),
                                        tap(() =>
                                            this.indicateLoadingStreamStatus()
                                        ),
                                        timeout(
                                            environment.appConfig.stream
                                                .waitForUserActionTimeoutWhenStreamBecomesAvailable
                                        ),
                                        catchError((error) => {
                                            if (error instanceof TimeoutError) {
                                                gtmGA4Track(
                                                    'stream_available_offer',
                                                    {
                                                        stream_available_response:
                                                            'timeout'
                                                    }
                                                );
                                                throw new NoUserAckForSwitchToStreamError(
                                                    'User did not acknowledge to switch over to stream',
                                                    error
                                                );
                                            }
                                            throw error;
                                        }),
                                        finalize(() => {
                                            this.applicationStateService.dispatch(
                                                setTransitionSilentlyToTargetVisualizationMode(
                                                    {
                                                        transitionSilently:
                                                            false
                                                    }
                                                )
                                            );
                                        })
                                    );
                            });

                            return this.applicationStateService
                                .getLocalState()
                                .pipe(
                                    selectTransitionSilentlyToTargetVisualizationMode,
                                    first(),
                                    switchMap((transitionSilently) =>
                                        transitionSilently
                                            ? waitForAck$
                                            : dontWaitForAck$
                                    )
                                );
                        }),
                        switchMap(() =>
                            // we have a valid connection to a session and are successfully connected to the engine. Apply initialization tasks
                            concat(
                                this.applyActiveWatermarkToEngine$(),
                                this.applyLocalEngineStateToRemote$(),
                                this.connectToEngineStateUpdatesAndApplyInitialAdjusts(),
                                this.updateOne2OneSessionStatusToActive$()
                            ).pipe(
                                // collect all results before proceeding
                                toArray()
                            )
                        ),
                        // keep watermark updated in case it does change
                        switchMap(
                            () =>
                                new Observable((subscriber) => {
                                    // continue immediately
                                    subscriber.next(undefined);

                                    const subscription =
                                        this.configurationSessionInfoService
                                            .getActiveConfigurationSessionInfo$()
                                            .pipe(
                                                map(
                                                    (sessionInfo) =>
                                                        sessionInfo?.engineData
                                                            .meta?.watermark
                                                            ?.text ?? null
                                                ),
                                                distinctUntilChanged(),
                                                switchMap((watermark) =>
                                                    this.applyWatermarkToEngine$(
                                                        watermark
                                                    )
                                                ),
                                                this.errorHandlerService.applyRetry(
                                                    {
                                                        isEligibleForRetry:
                                                            () => true,
                                                        maxRetries:
                                                            Number.POSITIVE_INFINITY
                                                    }
                                                )
                                            )
                                            .subscribe();

                                    return () => subscription.unsubscribe();
                                })
                        ),
                        switchMap(() =>
                            // all initialization tasks done, we may clear loading-stream status after small delay but continue stream immediately
                            of(undefined)
                                .pipe(
                                    delay(500),
                                    tap(() => this.clearLoadingStreamStatus()),
                                    startWith(undefined)
                                )
                                .pipe(distinctUntilChanged())
                        ),
                        // well, seems all is fine so emit STREAM as active visualization mode
                        map(() => VisualizationMode.STREAM),
                        // in case of an unsubscription (due to target-mode switch) or in case of an error, cleanup
                        finalize(() => {
                            // dismiss a possibly pending loading-state
                            this.clearLoadingStreamStatus();
                            this.engineStatePatchService.stopListenToEngineStateChanges();
                        }),
                        // when an error is thrown, before applying retry-logic, emit basic-vismode as active visualization mode and then let retry-logic determine what to do
                        catchError((error) =>
                            merge(
                                of(VisualizationMode.BASIC),
                                throwError(error)
                            )
                        ),
                        // apply retry-logic incorporating possible user-choices
                        this.errorHandlerService.applyRetry({
                            maxRetries: Number.POSITIVE_INFINITY,
                            maxRetryTimeout: 10000,
                            ignoreDefaultRetryableErrors: true,
                            skipGlobalErrorNotificationCallback: (error) =>
                                error instanceof
                                NoUserAckForSwitchToStreamError,
                            isEligibleForRetry: (error) => {
                                const isAnonymousSession = !sessionKey;

                                if (
                                    error instanceof
                                    NoUserAckForSwitchToStreamError
                                ) {
                                    // we had a session ready but the user did not acknowledge using it - do not retry..
                                    return false;
                                }

                                if (!environment.production) {
                                    console.error(
                                        'Failed connecting to stream',
                                        error
                                    );
                                }

                                // this flag determines if it is possible for the user to choose to continue in 2D and poll in the background to upgrade to 3D
                                const isPossibleToTryToUpgradeToStream = !(
                                    error instanceof StreamConnectionCutError ||
                                    error instanceof
                                        StreamRendererUnavailableQuotaExceededError
                                );

                                let retryDecision$: Observable<RetryDecision>;

                                if (error instanceof StreamConnectionCutError) {
                                    gtmGA4Track('stream_error', {
                                        stream_error_response:
                                            'insufficient bandwidth'
                                    });

                                    retryDecision$ =
                                        this.notifyUserAboutConnectionCutError$(
                                            isAnonymousSession
                                        ).pipe(
                                            map((tryAgain) => ({
                                                tryAgain,
                                                tryUpgradeToStream: false,
                                                updateVizModeChoice: true
                                            }))
                                        );
                                } else if (
                                    error instanceof StreamSessionNotFoundError
                                ) {
                                    // we try to connect to a dedicated session but the session does no longer exist / did never exist
                                    retryDecision$ =
                                        this.notifyUserAboutNonExistingSession$().pipe(
                                            map((tryAgain) => ({
                                                tryAgain,
                                                tryUpgradeToStream: false,
                                                updateVizModeChoice: false
                                            }))
                                        );
                                } else {
                                    const isRendererUnavailable =
                                        error instanceof
                                        StreamRendererUnavailableError;

                                    if (isRendererUnavailable) {
                                        gtmGA4Track('stream_error', {
                                            stream_error_response: 'no render'
                                        });
                                    }

                                    retryDecision$ =
                                        this.notifyUserAboutUnavailableSessionOrConnectionAbort$(
                                            isRendererUnavailable,
                                            isAnonymousSession,
                                            error instanceof
                                                StreamRendererUnavailableQuotaExceededError
                                        ).pipe(
                                            map((tryAgain) => ({
                                                tryAgain,
                                                tryUpgradeToStream:
                                                    !tryAgain &&
                                                    isPossibleToTryToUpgradeToStream,
                                                updateVizModeChoice: false
                                            }))
                                        );
                                }

                                return this.applicationStateService
                                    .getLocalState()
                                    .pipe(
                                        selectTransitionSilentlyToTargetVisualizationMode,
                                        switchMap(
                                            (
                                                transitionSilently
                                            ): Observable<RetryDecision> => {
                                                if (!transitionSilently) {
                                                    // we do not try to transition silently, so ask the user
                                                    return retryDecision$;
                                                }
                                                // we try to transition silently, so retry only in case it's possible
                                                return of({
                                                    tryAgain:
                                                        isPossibleToTryToUpgradeToStream,
                                                    tryUpgradeToStream:
                                                        isPossibleToTryToUpgradeToStream,
                                                    updateVizModeChoice: false
                                                });
                                            }
                                        ),
                                        tap((retryDecision) => {
                                            if (
                                                !retryDecision.tryAgain &&
                                                !retryDecision.tryUpgradeToStream
                                            ) {
                                                // retryDecision is not to try again and not to upgrade to stream, so switch to basic
                                                this.visualizationModeService.setTargetVisualizationMode(
                                                    VisualizationMode.BASIC,
                                                    retryDecision.updateVizModeChoice
                                                );
                                            }
                                            this.applicationStateService.dispatch(
                                                setTransitionSilentlyToTargetVisualizationMode(
                                                    {
                                                        transitionSilently:
                                                            retryDecision.tryUpgradeToStream
                                                    }
                                                )
                                            );
                                        }),
                                        map(
                                            (retryDecision) =>
                                                retryDecision.tryAgain ||
                                                retryDecision.tryUpgradeToStream
                                        )
                                    );
                            }
                        }),
                        catchError((error) => {
                            // somewhere in the chain we got an error and it has not been retried, so revert to BASIC mode
                            this.visualizationModeService.setTargetVisualizationMode(
                                VisualizationMode.BASIC
                            );
                            this.applicationStateService.dispatch(
                                setTransitionSilentlyToTargetVisualizationMode({
                                    transitionSilently: false
                                })
                            );
                            return of(VisualizationMode.BASIC);
                        })
                    );
                })
            );
    }

    private indicateLoadingStreamStatus() {
        // INDICATE THAT WE ARE TRYING TO LOAD THE STREAM
        this.applicationStateService.dispatch(
            setActiveLoadingState({
                context: STREAM_LOADING_CONTEXT,
                loading: true,
                showLoadingSpinnerWhenLoading: true,
                statusText: translate('STREAMING.CONNECTING_STREAM')
            })
        );
    }

    private clearLoadingStreamStatus() {
        this.applicationStateService.dispatch(
            clearActiveLoadingStateIfInContext({
                context: STREAM_LOADING_CONTEXT
            })
        );
    }

    /**
     * Stream unavailable. Ask user how to proceed.
     * Allow a simple retry or continue in 2D-mode enabling a background-check
     * for instances to become available.
     * @param isRendererUnavailableError If the cause of the notification is a renderer not being available (true) or some other error (false).
     * @param isAnonymousSession If the current session is an anonymous one or based on a session key
     * @param isQuotaExceeded If the quota has been exceeded
     */
    private notifyUserAboutUnavailableSessionOrConnectionAbort$(
        isRendererUnavailableError: boolean,
        isAnonymousSession: boolean,
        isQuotaExceeded: boolean
    ): Observable<boolean> {
        let text = translate('STREAMING.ERROR.DESCRIPTION');
        if (isRendererUnavailableError) {
            text = translate(
                isQuotaExceeded
                    ? 'STREAMING.INSTANCE_AVAILABILITY.DESCRIPTION_NO_QUOTA'
                    : 'STREAMING.INSTANCE_AVAILABILITY.DESCRIPTION'
            );
        }

        return this.commonDialogService
            .openAdvancedConfirmDialog$(
                translate(
                    isRendererUnavailableError
                        ? 'STREAMING.INSTANCE_AVAILABILITY.HEADLINE'
                        : 'STREAMING.ERROR.HEADLINE'
                ),
                text,
                isAnonymousSession
                    ? [
                          ...(isQuotaExceeded
                              ? []
                              : [
                                    {
                                        id: 'TRY_AGAIN',
                                        type: DialogButtonType.SECONDARY,
                                        label: translate(
                                            isRendererUnavailableError
                                                ? 'STREAMING.INSTANCE_AVAILABILITY.CTA_TRY_AGAIN'
                                                : 'STREAMING.ERROR.CTA_TRY_AGAIN'
                                        ),
                                        placement: DialogButtonPlacement.LEFT
                                    }
                                ]),
                          {
                              id: 'CANCEL',
                              type: DialogButtonType.PRIMARY,
                              label: translate(
                                  isRendererUnavailableError
                                      ? 'STREAMING.INSTANCE_AVAILABILITY.CTA_CONTINUE'
                                      : 'STREAMING.ERROR.CTA_CONTINUE'
                              ),
                              placement: DialogButtonPlacement.RIGHT
                          }
                      ]
                    : [
                          {
                              id: 'TRY_AGAIN',
                              type: DialogButtonType.PRIMARY,
                              label: translate(
                                  isRendererUnavailableError
                                      ? 'STREAMING.INSTANCE_AVAILABILITY.CTA_TRY_AGAIN'
                                      : 'STREAMING.ERROR.CTA_TRY_AGAIN'
                              ),
                              placement: DialogButtonPlacement.RIGHT
                          }
                      ],
                {
                    dialogOptions: DEFAULT_DIALOG_OPTIONS
                }
            )
            .pipe(map((value) => value.result === 'TRY_AGAIN'));
    }

    /**
     * Connection cut due to possible bandwidth limitations. Show dialog to user and provide option to continue in 2D.
     */
    private notifyUserAboutConnectionCutError$(isAnonymousSession: boolean) {
        return this.commonDialogService
            .openAdvancedConfirmDialog$(
                translate('STREAMING.BANDWITH.HEADLINE'),
                translate('STREAMING.BANDWITH.DESCRIPTION'),
                isAnonymousSession
                    ? [
                          {
                              id: 'CANCEL',
                              type: DialogButtonType.PRIMARY,
                              label: translate(
                                  'STREAMING.BANDWITH.CTA_CONTINUE'
                              )
                          }
                      ]
                    : [
                          {
                              id: 'TRY_AGAIN',
                              type: DialogButtonType.PRIMARY,
                              label: translate(
                                  'STREAMING.BANDWITH.CTA_TRY_AGAIN'
                              )
                          }
                      ],
                {
                    dialogOptions: DEFAULT_DIALOG_OPTIONS
                }
            )
            .pipe(map((value) => value.result === 'TRY_AGAIN'));
    }

    /**
     * Connection to a dedicated session could not be established as the session in question does not exist.
     */
    private notifyUserAboutNonExistingSession$() {
        return this.commonDialogService
            .openAdvancedConfirmDialog$(
                translate('STREAMING.SESSION_AVAILABILITY.HEADLINE'),
                translate('STREAMING.SESSION_AVAILABILITY.DESCRIPTION'),
                [
                    {
                        id: 'CONFIRM',
                        type: DialogButtonType.PRIMARY,
                        label: translate('STREAMING.BANDWITH.CTA_CONTINUE')
                    }
                ],
                {
                    dialogOptions: DEFAULT_DIALOG_OPTIONS
                }
            )
            .pipe(map(() => false));
    }

    /**
     * Sends the currently held UiGlobalApplicationState.engine state branch to the
     * engine to be applied there.
     */
    private applyLocalEngineStateToRemote$(): Observable<void> {
        // apply current configuration
        return this.applicationStateService.getState().pipe(
            selectConfigurationState,
            withLatestFrom(this.l10nService.getActiveCountry$()),
            first(),
            switchMap(([engineConfigurationState, activeCountry]) => {
                if (!engineConfigurationState) {
                    return of(undefined);
                }
                if (!activeCountry) {
                    throw new IllegalStateError('No active country set');
                }

                // apply country to state
                let patchedEngineConfigurationState = engineConfigurationState;
                if (engineConfigurationState.productState) {
                    patchedEngineConfigurationState = {
                        ...engineConfigurationState,
                        productState: {
                            ...engineConfigurationState.productState,
                            country: activeCountry
                        }
                    };
                }

                return this.engineControlSocketStrategy
                    .applyEngineControlState$(patchedEngineConfigurationState)
                    .pipe(this.errorHandlerService.applyRetry());
            })
        );
    }

    private initOne2OneSessionLogic() {
        const updateSharedCountrySubscription = this.l10nService
            .getActiveCountry$()
            .pipe(
                switchMap((activeCountry) =>
                    this.uiSharedStateService
                        .updateUiState((uiState) => {
                            uiState.overrideCountry = activeCountry;

                            return uiState;
                        })
                        .pipe(
                            this.errorHandlerService.applyRetry({
                                maxRetries: Number.POSITIVE_INFINITY,
                                isEligibleForRetry: () => true
                            })
                        )
                )
            )
            .subscribe();

        return () => {
            updateSharedCountrySubscription.unsubscribe();
        };
    }

    private updateToStreamBackedStrategiesWhenConnected() {
        // update EngineControlStrategy to socket-io-based
        this.engineControlStrategyProvider.setStrategy(
            this.engineControlSocketStrategy,
            {
                type: 'STREAM'
            }
        );

        // update UiSharedStateStrategy to socket-io-based
        this.uiSharedStateStrategyProvider.setStrategy(
            this.uiSharedStateSocketStrategy,
            {
                type: 'STREAM'
            }
        );
    }

    /**
     * Before fetching the engine state, keep hold of initial local states
     * and apply these to the engine state when it was initially fetched from remote.
     * @private
     */
    private connectToEngineStateUpdatesAndApplyInitialAdjusts() {
        return defer(() =>
            combineLatest([
                // get local state first
                this.applicationStateService.getEngineState().pipe(
                    map((state) => state.uiState),
                    first()
                ),
                this.engineStatePatchService.startListenToEngineStateChanges()
            ]).pipe(
                first(),
                tap(() => {
                    // update to stream-backend strategies
                    this.updateToStreamBackedStrategiesWhenConnected();
                }),
                switchMap(([latestUiState, appState]) =>
                    // initial engine state is here, update contained shared ui-state using locally known ui-state
                    this.uiSharedStateService
                        .updateUiState(() => latestUiState)
                        .pipe(
                            this.errorHandlerService.applyRetry({
                                isEligibleForRetry: () => true
                            })
                        )
                )
            )
        );
    }

    private updateOne2OneSessionStatusToActive$() {
        return this.uiSharedStateService
            .updateUiState((uiState) => ({
                ...uiState,
                sessionState: One2OneSessionState.ACTIVE
            }))
            .pipe(
                this.errorHandlerService.applyRetry({
                    maxRetries: 10,
                    isEligibleForRetry: () => true
                }),
                catchError((error) => {
                    console.warn(
                        'Failed applying local ui-state to remote',
                        error
                    );
                    return of(undefined);
                })
            );
    }

    private applyActiveWatermarkToEngine$() {
        return this.configurationSessionInfoService
            .getActiveConfigurationSessionInfo$()
            .pipe(
                take(1),
                switchMap((sessionInfo) =>
                    this.applyWatermarkToEngine$(
                        sessionInfo?.engineData.meta?.watermark?.text ?? null
                    ).pipe(this.errorHandlerService.applyRetry())
                )
            );
    }

    private applyWatermarkToEngine$(watermark: string | null) {
        return this.engineControlSocketStrategy.setWatermark$(watermark);
    }
}
