package admin

import (
	"context"
	"crypto/rand"
	"errors"
	"fmt"
	"git.wamblee.org/converge/pkg/comms"
	"git.wamblee.org/converge/pkg/models"
	"git.wamblee.org/converge/pkg/support/ioutils"
	"git.wamblee.org/converge/pkg/testsupport"
	"github.com/stretchr/testify/suite"
	"go.uber.org/goleak"
	"io"
	"net"
	"net/http"
	"strings"
	"sync"
	"testing"
	"time"
)

// test case
//
// Overall:
// - Connect agent, connect 2 clients
// - Connect multiple agents and clients

type AdminTestSuite struct {
	suite.Suite

	ctx         context.Context
	cancelFunc  context.CancelFunc
	pprofServer *http.Server

	admin   *Admin
	hostKey []byte
}

func (s *AdminTestSuite) SetupSuite() {
	s.pprofServer = testsupport.StartPprof("")
}

func (s *AdminTestSuite) TearDownSuite() {
	testsupport.StopPprof(s.ctx, s.pprofServer)
}

func (s *AdminTestSuite) SetupTest() {
	ctx, cancelFunc := testsupport.CreateTestContext(context.Background(), 10*time.Second)
	s.ctx = ctx
	s.cancelFunc = cancelFunc

	s.admin = NewAdmin()
	s.hostKey = make([]byte, 100)
	s.NotNil(rand.Read(s.hostKey))
}

func (s *AdminTestSuite) TearDownTest() {
	s.admin.Close()
	s.cancelFunc()
	goleak.VerifyNone(s.T())
}

func TestAdminTestSuite(t *testing.T) {
	suite.Run(t, &AdminTestSuite{})
}

type AddAgentResult struct {
	agentConn *agentConnection
	err       error
}

func (s *AdminTestSuite) agentRegisters(requestedPublicId, assignedPublicId string) (AddAgentResult, AgentRegisterResult) {
	agentToServerRW, serverToAgentRW := testsupport.CreatePipe(s.ctx)
	res := testsupport.RunAndWait(
		&s.Suite,
		func() any {
			agentConn, err := s.addAgent(requestedPublicId, assignedPublicId, serverToAgentRW)
			return AddAgentResult{
				agentConn: agentConn,
				err:       err,
			}
		},
		func() any {
			res := s.agentRegistration(agentToServerRW)
			if assignedPublicId != "" {
				s.Nil(res.err)
				s.True(res.registration.Ok)
				s.Equal(s.hostKey, res.registration.HostPrivateKey)
			}
			return res
		})
	return res[0].(AddAgentResult), res[1].(AgentRegisterResult)
}

type AgentRegisterResult struct {
	registration comms.AgentRegistration
	commChannel  comms.CommChannel
	listener     *testsupport.TestAgentListener
	err          error
}

func (s *AdminTestSuite) Test_AgentRegisters() {
	publicId := "abc"
	res, _ := s.agentRegisters(publicId, publicId)
	s.Nil(res.err)
	agentConn := res.agentConn
	state := s.admin.CreateNotifification()
	s.Equal(1, len(state.Agents))
	s.Equal(0, len(state.Clients))
	s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])

	// Now unregister
	s.False(agentConn.CommChannel.Session.IsClosed())
	s.Nil(s.admin.RemoveAgent(models.RendezVousId(publicId)))
	s.True(agentConn.CommChannel.Session.IsClosed())
	// copy on write, orioginal state is unchanged
	s.Equal(1, len(state.Agents))
	s.Equal(0, len(state.Clients))
	state = s.admin.CreateNotifification()
	s.Equal(0, len(state.Agents))
	s.Equal(0, len(state.Clients))
}

func (s *AdminTestSuite) Test_ManyAgentsRegister() {
	N := 10
	agentRegistrations := make([]testsupport.TestFunction, N)
	for i := range N {
		publicId := fmt.Sprintf("abc%d", i)
		agentRegistrations[i] = func() any {
			res, _ := s.agentRegisters(publicId, publicId)
			s.Nil(res.err)
			return res.agentConn
		}
	}
	res := testsupport.RunAndWait(
		&s.Suite,
		agentRegistrations...)
	state := s.admin.CreateNotifification()
	s.Equal(len(res), len(state.Agents))
	s.Equal(0, len(state.Clients))
	for _, entry := range res {
		agentConn := entry.(*agentConnection)
		s.Equal(agentConn.Info, state.Agents[agentConn.Info.Guid])
	}
}

func (s *AdminTestSuite) Test_agentDuplicateId() {
	publicId := "abc"
	res, _ := s.agentRegisters(publicId, publicId)
	s.Nil(res.err)
	for i := range 100 {
		res, _ = s.agentRegisters(publicId, fmt.Sprintf("%s-%d", publicId, i))
		s.Nil(res.err)
	}
	res, agentSideResult := s.agentRegisters(publicId, "")
	s.NotNil(res.err)
	// verify it is the correct error and not an id mismatch.
	s.True(strings.Contains(res.err.Error(), "could not allocate a new unique id"))
	s.False(agentSideResult.registration.Ok)
}

func (s *AdminTestSuite) Test_connectClient() error {
	publicId := "abc"
	serverRes, agentRes := s.agentRegisters(publicId, publicId)
	s.Nil(serverRes.err)
	s.Nil(agentRes.err)

	data := "connect client test msg"
	clientConn, err := s.connectClientToAgent("singleclient", publicId, data, agentRes)
	s.Nil(err)
	if err != nil {
		return err
	}

	// verify state
	state := s.admin.CreateNotifification()
	s.Equal(1, len(state.Agents))
	s.Equal(1, len(state.Clients))
	s.Equal(clientConn.Info, state.Clients[clientConn.Info.Guid])

	// removing the client will close all connections, we test this by writing to the connections
	// after removing the client.
	err = s.admin.RemoveClient(clientConn.Info.ClientId)
	s.Nil(err)
	buf := make([]byte, 10)
	_, err = clientConn.clientConnection.Write(buf)
	s.NotNil(err)
	s.True(strings.Contains(err.Error(), "closed"))
	_, err = clientConn.agentConnection.Write(buf)
	s.NotNil(err)
	s.True(strings.Contains(err.Error(), "closed"))
	return nil
}

func (s *AdminTestSuite) Test_MultipleAgentsAndClients() {
	clientCounts := []int{10, 5, 37, 1, 29}

	wg := sync.WaitGroup{}
	for iagent, clientCount := range clientCounts {
		wg.Add(1)
		data := fmt.Sprintf("Agent test msg %d ", iagent)
		go func() {
			defer wg.Done()
			publicId := fmt.Sprintf("abc%d", iagent)
			serverRes, agentRes := s.agentRegisters(publicId, publicId)
			s.Nil(serverRes.err)
			s.Nil(agentRes.err)
			for i := 0; i < clientCount; i++ {
				// cannot yet create clients in parallel. Problem is that calling
				// listener.Accept on the agent side can yield a connection from another
				// client. The listener omehow needs to publish the connections that ere
				// created in a map base on client id. The client can then retrieve the
				// connection based on the client id and should also wait until the
				// connection is available.
				wg.Add(1)
				go func() {
					defer wg.Done()
					iclient := i
					client := fmt.Sprintf("client %d/%d", iagent, iclient)
					_, err := s.connectClientToAgent(client, publicId, data, agentRes)
					s.Nil(err)
				}()
			}
		}()
	}
	wg.Wait()

}

func (s *AdminTestSuite) connectClientToAgent(
	client string, publicId string, data string, agentRes AgentRegisterResult) (*clientConnection, error) {
	serverToClientRW, clientToServerRW := testsupport.CreatePipe(s.ctx)

	// TODO refactoring
	// - TestAgentListener should run in a separate go routine
	//   Started by TestAgentSuite.
	//
	// TODO split up:
	// 1. server: connects to agent, agent: listens for connections
	//    output:   server: clientConnection with a.o. clientId
	//              agent: listener
	// 2. communication check:
	//    server: use yamux connection to send message
	//    agent: retrieve connection from listener based on client id from clientConnection
	//           -> yamux connection
	//    exchange messages in two directions.
	// 3. birectional communication
	//    full communication from client to agent through the converge server.

	// Connect server to agent
	res := testsupport.RunAndWait(
		&s.Suite,
		// Server: agent is already listening and accepts all connections and stores them based on clientId
		func() any {
			return s.connectClient(publicId, serverToClientRW)
		})

	// bidirectional communication check
	clientConn := res[0].(*clientConnection)
	s.NotNil(clientConn)
	if clientConn == nil {
		return nil, errors.New("Client connection is nil")
	}
	clientId := clientConn.Info.ClientId

	// Retrieve the agent side connection for this client that was setup by the server
	agentToServerYamux, err := s.clientConnection(clientId, agentRes.listener)
	s.Nil(err)
	if err != nil {
		return nil, err
	}

	serverToAgentYamux := clientConn.agentConnection

	// Now first test the communication from server to agent over the just established connection
	testsupport.RunAndWait(
		&s.Suite,
		func() any {
			s.sendYamuxMsgServerToAgent(serverToAgentYamux, data)
			return nil
		},
		func() any {
			s.receiveYamuxMsgServerToAgent(agentToServerYamux, data)
			return nil
		})

	// Synchronize data between client and agent through the server
	go clientConn.Synchronize()
	msg := fmt.Sprintf("end-to-end %s", client)
	// verify bidirectional communication
	testsupport.BidirectionalConnectionCheck(&s.Suite, msg, clientToServerRW, agentToServerYamux)

	return clientConn, nil
}

func (s *AdminTestSuite) Test_connectClientUnknownRendezVousId() {
	publicId := "abc"
	serverRes, agentRes := s.agentRegisters(publicId, publicId)
	s.Nil(serverRes.err)
	s.Nil(agentRes.err)

	serverToClientRW, _ := testsupport.CreatePipe(s.ctx)

	_, err := s.admin.AddClient(models.RendezVousId(publicId+"sothatisunknown"),
		ioutils.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
	s.NotNil(err)

	// verify state
	state := s.admin.CreateNotifification()
	s.Equal(1, len(state.Agents))
	s.Equal(0, len(state.Clients))
}

// Registering an agent on the server

func (s *AdminTestSuite) addAgent(publicId string, assignedPublicId string, serverToAgentRW io.ReadWriteCloser) (*agentConnection, error) {
	agentConn, err := s.admin.AddAgent(
		s.hostKey, models.RendezVousId(publicId), comms.EnvironmentInfo{},
		serverToAgentRW)
	if err != nil {
		return nil, err
	}
	s.Equal(assignedPublicId, string(agentConn.Info.PublicId))
	return agentConn, nil
}

// Agent activities registring on the server
func (s *AdminTestSuite) agentRegistration(agentToServerRW io.ReadWriteCloser) AgentRegisterResult {
	// verify registration message received
	agentRegistration, err := comms.ReceiveRegistrationMessage(agentToServerRW)
	if err != nil {
		return AgentRegisterResult{err: err}
	}
	commChannel, err := comms.NewCommChannel(comms.Agent, agentToServerRW)
	if err != nil {
		return AgentRegisterResult{registration: agentRegistration, err: err}
	}
	s.NotNil(commChannel)

	baseListener := comms.NewAgentListener(commChannel.Session)
	listener := testsupport.NewTestListener(s.ctx, baseListener)

	return AgentRegisterResult{
		registration: agentRegistration,
		commChannel:  commChannel,
		listener:     listener,
		err:          nil,
	}
}

func (s *AdminTestSuite) connectClient(publicId string, serverToClientRW io.ReadWriteCloser) any {
	// server
	clientConn, err := s.admin.AddClient(models.RendezVousId(publicId),
		ioutils.NewSimpleReadWriteAddrCloser(serverToClientRW, testsupport.DummyRemoteAddr("remoteaddr")))
	s.Nil(err)
	return clientConn
}

func (s *AdminTestSuite) clientConnection(clientId models.ClientId, listener *testsupport.TestAgentListener) (net.Conn, error) {
	// agent
	agentToServerYamux, err := listener.GetConnection(string(clientId))
	s.Nil(err)
	return agentToServerYamux, err
}

func (s *AdminTestSuite) sendYamuxMsgServerToAgent(serverToAgentYamux io.Writer, data string) {
	// server
	testsupport.AssertWriteData(&s.Suite, data, serverToAgentYamux)
}

func (s *AdminTestSuite) receiveYamuxMsgServerToAgent(agentToServerYamux io.Reader, data string) {
	// agent
	testsupport.AssertReadData(&s.Suite, data, agentToServerYamux)
}