From 75d1c86ea364799570abf6705385620b73c79c4b Mon Sep 17 00:00:00 2001 From: Maxim Lebedev Date: Tue, 14 Mar 2023 04:19:53 +0600 Subject: [PATCH] :ok_hand: Refactored after pass websub.rocks tests --- internal/common/common.go | 2 + internal/domain/callback.go | 37 ---- internal/domain/challenge.go | 9 +- internal/domain/lease_seconds.go | 34 ---- internal/domain/push.go | 26 --- internal/domain/subscription.go | 70 ++++++-- internal/domain/suid.go | 22 +-- internal/domain/topic.go | 37 ++-- internal/hub/delivery/http/hub_http.go | 142 +++++++-------- internal/hub/delivery/http/hub_http_test.go | 53 +++--- internal/hub/usecase.go | 5 +- internal/hub/usecase/hub_ucase.go | 166 +++++++++--------- internal/hub/usecase/hub_ucase_test.go | 43 +++++ internal/middleware/logfmt.go | 1 + internal/subscription/repository.go | 2 +- .../repository/memory/memory_subscription.go | 24 ++- internal/subscription/usecase.go | 3 +- .../usecase/subscription_ucase.go | 81 ++++++++- .../usecase/subscription_ucase_test.go | 73 ++++++++ internal/topic/repository.go | 25 +++ .../topic/repository/memory/memory_topic.go | 85 +++++++++ internal/topic/usecase.go | 10 ++ internal/topic/usecase/topic_ucase.go | 66 +++++++ internal/topic/usecase/topic_ucase_test.go | 43 +++++ internal/urlutil/shift_path_test.go | 37 ++++ main.go | 29 ++- web/static/manifest.webmanifest | 38 ++-- web/template/baseof.qtpl | 6 +- 28 files changed, 764 insertions(+), 405 deletions(-) delete mode 100644 internal/domain/callback.go delete mode 100644 internal/domain/lease_seconds.go delete mode 100644 internal/domain/push.go create mode 100644 internal/hub/usecase/hub_ucase_test.go create mode 100644 internal/subscription/usecase/subscription_ucase_test.go create mode 100644 internal/topic/repository.go create mode 100644 internal/topic/repository/memory/memory_topic.go create mode 100644 internal/topic/usecase.go create mode 100644 internal/topic/usecase/topic_ucase.go create mode 100644 internal/topic/usecase/topic_ucase_test.go create mode 100644 internal/urlutil/shift_path_test.go diff --git a/internal/common/common.go b/internal/common/common.go index 1a97698..b3c68df 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -5,6 +5,8 @@ const ( MIMEApplicationFormCharsetUTF8 = MIMEApplicationForm + "; " + charsetUTF8 MIMETextHTML = "text/html" MIMETextHTMLCharsetUTF8 = MIMETextHTML + "; " + charsetUTF8 + MIMETextPlain = "text/plain" + MIMETextPlainCharsetUTF8 = MIMETextPlain + "; " + charsetUTF8 charsetUTF8 = "charset=UTF-8" ) diff --git a/internal/domain/callback.go b/internal/domain/callback.go deleted file mode 100644 index 1cae0f1..0000000 --- a/internal/domain/callback.go +++ /dev/null @@ -1,37 +0,0 @@ -package domain - -import ( - "fmt" - "net/url" - - "source.toby3d.me/toby3d/hub/internal/common" -) - -// Callback describes the URL at which a subscriber wishes to receive content -// distribution requests. -type Callback struct { - callback *url.URL -} - -func ParseCallback(str string) (*Callback, error) { - u, err := url.Parse(str) - if err != nil { - return nil, fmt.Errorf("cannot parse string as callback URL: %w", err) - } - - return &Callback{callback: u}, nil -} - -func (c Callback) AddQuery(q url.Values) { - q.Add(common.HubCallback, c.callback.String()) -} - -func (c Callback) URL() *url.URL { - u, _ := url.Parse(c.callback.String()) - - return u -} - -func (c Callback) String() string { - return c.callback.String() -} diff --git a/internal/domain/challenge.go b/internal/domain/challenge.go index e6e45c8..6e01d59 100644 --- a/internal/domain/challenge.go +++ b/internal/domain/challenge.go @@ -1,7 +1,6 @@ package domain import ( - "bytes" "crypto/rand" "encoding/base64" "fmt" @@ -11,7 +10,7 @@ import ( ) type Challenge struct { - challenge []byte + challenge string } func NewChallenge(length uint8) (*Challenge, error) { @@ -20,15 +19,15 @@ func NewChallenge(length uint8) (*Challenge, error) { return nil, fmt.Errorf("cannot create a new challenge: %w", err) } - return &Challenge{challenge: []byte(base64.URLEncoding.EncodeToString(src))}, nil + return &Challenge{challenge: base64.URLEncoding.EncodeToString(src)}, nil } func (c Challenge) AddQuery(q url.Values) { q.Add(common.HubChallenge, string(c.challenge)) } -func (c Challenge) Equal(target []byte) bool { - return bytes.Equal(c.challenge, target) +func (c Challenge) Equal(target string) bool { + return c.challenge == target } func (c Challenge) String() string { diff --git a/internal/domain/lease_seconds.go b/internal/domain/lease_seconds.go deleted file mode 100644 index 0d12fd6..0000000 --- a/internal/domain/lease_seconds.go +++ /dev/null @@ -1,34 +0,0 @@ -package domain - -import ( - "net/url" - "strconv" - - "source.toby3d.me/toby3d/hub/internal/common" -) - -// LeaseSeconds describes a number of seconds for which the subscriber would -// like to have the subscription active, given as a positive decimal integer. -// Hubs MAY choose to respect this value or not, depending on their own -// policies, and MAY set a default value if the subscriber omits the parameter. -// This parameter MAY be present for unsubscription requests and MUST be ignored -// by the hub in that case. -type LeaseSeconds struct { - leaseSeconds uint -} - -func NewLeaseSeconds(raw uint) LeaseSeconds { - return LeaseSeconds{leaseSeconds: raw} -} - -func (ls LeaseSeconds) AddQuery(q url.Values) { - if ls.leaseSeconds == 0 { - return - } - - q.Add(common.HubLeaseSeconds, strconv.FormatUint(uint64(ls.leaseSeconds), 10)) -} - -func (ls LeaseSeconds) IsZero() bool { - return ls.leaseSeconds == 0 -} diff --git a/internal/domain/push.go b/internal/domain/push.go deleted file mode 100644 index 3c7e4a3..0000000 --- a/internal/domain/push.go +++ /dev/null @@ -1,26 +0,0 @@ -package domain - -import ( - "crypto/hmac" - "encoding/hex" - "hash" - "net/http" - - "source.toby3d.me/toby3d/hub/internal/common" -) - -type Push struct { - Self Topic - ContentType string - Content []byte -} - -func (p Push) SetXHubSignatureHeader(req *http.Request, alg Algorithm, secret Secret) { - if alg == AlgorithmUnd || secret.secret == "" { - return - } - - h := func() hash.Hash { return alg.Hash() } - req.Header.Set(common.HeaderXHubSignature, alg.algorithm+"="+hex.EncodeToString(hmac.New(h, - []byte(secret.secret)).Sum(p.Content))) -} diff --git a/internal/domain/subscription.go b/internal/domain/subscription.go index d4a708d..e1d02fc 100644 --- a/internal/domain/subscription.go +++ b/internal/domain/subscription.go @@ -1,46 +1,78 @@ package domain import ( - "math/rand" "net/url" + "strconv" "testing" + "time" + + "source.toby3d.me/toby3d/hub/internal/common" ) // Subscription is a unique relation to a topic by a subscriber that indicates // it should receive updates for that topic. type Subscription struct { - Topic Topic - Callback Callback - Secret Secret - LeaseSeconds LeaseSeconds -} + // First creation datetime + CreatedAt time.Time -func (s Subscription) SUID() SUID { - return NewSSID(s.Topic, s.Callback) + // Last updating datetime + UpdatedAt time.Time + + // Datetime when subscription must be deleted + ExpiredAt time.Time + + // Datetime synced with topic updating time + SyncedAt time.Time + + Callback *url.URL + Topic *url.URL + + Secret Secret } func (s Subscription) AddQuery(q url.Values) { - for _, w := range []QueryAdder{s.Callback, s.Topic, s.LeaseSeconds, s.Secret} { - w.AddQuery(q) + s.Secret.AddQuery(q) + q.Add(common.HubTopic, s.Topic.String()) + q.Add(common.HubCallback, s.Callback.String()) + q.Add(common.HubLeaseSeconds, strconv.FormatFloat(s.LeaseSeconds(), 'g', 0, 64)) +} + +func (s Subscription) SUID() SUID { + return SUID{ + topic: s.Topic.String(), + callback: s.Callback.String(), } } +func (s Subscription) LeaseSeconds() float64 { + return s.ExpiredAt.Sub(s.UpdatedAt).Round(time.Second).Seconds() +} + +func (s Subscription) Synced(t Topic) bool { + return s.SyncedAt.Equal(t.UpdatedAt) || s.SyncedAt.After(t.UpdatedAt) +} + +func (s Subscription) Expired(ts time.Time) bool { + return s.ExpiredAt.Before(ts) +} + func TestSubscription(tb testing.TB, callbackUrl string) *Subscription { tb.Helper() - callback, err := ParseCallback(callbackUrl) + callback, err := url.Parse(callbackUrl) if err != nil { tb.Fatal(err) } + ts := time.Now().UTC().Round(time.Second) + secret := TestSecret(tb) + return &Subscription{ - Topic: Topic{topic: &url.URL{ - Scheme: "https", - Host: "example.com", - Path: "lipsum", - }}, - Callback: *callback, - Secret: *TestSecret(tb), - LeaseSeconds: NewLeaseSeconds(uint(rand.Intn(60))), + CreatedAt: ts, + UpdatedAt: ts, + ExpiredAt: ts.Add(10 * 24 * time.Hour).Round(time.Second), + Callback: callback, + Topic: &url.URL{Scheme: "https", Host: "example.com", Path: "/lipsum"}, + Secret: *secret, } } diff --git a/internal/domain/suid.go b/internal/domain/suid.go index 091bb44..2172e9c 100644 --- a/internal/domain/suid.go +++ b/internal/domain/suid.go @@ -1,29 +1,25 @@ package domain +import "net/url" + // SUID describes a subscription's unique key is the tuple ([Topic] URL, // Subscriber [Callback] URL). type SUID struct { - suid [2]string + topic string + callback string } -func NewSSID(topic Topic, callback Callback) SUID { +func NewSSID(topic Topic, callback *url.URL) SUID { return SUID{ - suid: [2]string{topic.topic.String(), callback.callback.String()}, + topic: topic.Self.String(), + callback: callback.String(), } } func (suid SUID) Equal(target SUID) bool { - for i := range suid.suid { - if suid.suid[i] == target.suid[i] { - continue - } - - return false - } - - return true + return suid.topic == target.topic && suid.callback == target.callback } func (suid SUID) GoString() string { - return "domain.SUID(" + suid.suid[0] + ":" + suid.suid[1] + ")" + return "domain.SUID(" + suid.topic + ":" + suid.callback + ")" } diff --git a/internal/domain/topic.go b/internal/domain/topic.go index b69b12a..716d36c 100644 --- a/internal/domain/topic.go +++ b/internal/domain/topic.go @@ -1,38 +1,43 @@ package domain import ( - "fmt" "net/url" + "testing" + "time" "source.toby3d.me/toby3d/hub/internal/common" ) -// Topic is a HTTP [RFC7230] (or HTTPS [RFC2818]) resource URL. The unit to -// which one can subscribe to changes. -// -// [RFC7230]: https://tools.ietf.org/html/rfc7230 -// [RFC2818]: https://tools.ietf.org/html/rfc2818 type Topic struct { - topic *url.URL + CreatedAt time.Time + UpdatedAt time.Time + Self *url.URL + ContentType string + Content []byte } -func ParseTopic(str string) (*Topic, error) { - u, err := url.Parse(str) - if err != nil { - return nil, fmt.Errorf("cannot parse string as topic URL: %w", err) - } +func TestTopic(tb testing.TB) *Topic { + tb.Helper() - return &Topic{topic: u}, nil + now := time.Now().UTC().Add(-1 * time.Hour) + + return &Topic{ + CreatedAt: now, + UpdatedAt: now, + Self: &url.URL{Scheme: "https", Host: "example.com", Path: "/"}, + ContentType: "text/html", + Content: []byte("hello, world"), + } } func (t Topic) AddQuery(q url.Values) { - q.Add(common.HubTopic, t.topic.String()) + q.Add(common.HubTopic, t.Self.String()) } func (t Topic) Equal(target Topic) bool { - return t.topic.String() == target.topic.String() + return t.Self.String() == target.Self.String() } func (t Topic) String() string { - return t.topic.String() + return t.Self.String() } diff --git a/internal/hub/delivery/http/hub_http.go b/internal/hub/delivery/http/hub_http.go index 875497a..b637e57 100644 --- a/internal/hub/delivery/http/hub_http.go +++ b/internal/hub/delivery/http/hub_http.go @@ -1,6 +1,7 @@ package http import ( + "context" "errors" "fmt" "net/http" @@ -14,27 +15,29 @@ import ( "source.toby3d.me/toby3d/hub/internal/domain" "source.toby3d.me/toby3d/hub/internal/hub" "source.toby3d.me/toby3d/hub/internal/subscription" + "source.toby3d.me/toby3d/hub/internal/topic" "source.toby3d.me/toby3d/hub/web/template" ) type ( Request struct { - Callback domain.Callback - Topic domain.Topic + Callback *url.URL + Topic *url.URL Secret domain.Secret Mode domain.Mode - LeaseSeconds domain.LeaseSeconds + LeaseSeconds float64 } Response struct { Mode domain.Mode - Topic domain.Topic Reason string + Topic domain.Topic } NewHandlerParams struct { Hub hub.UseCase Subscriptions subscription.UseCase + Topics topic.UseCase Matcher language.Matcher Name string } @@ -42,12 +45,13 @@ type ( Handler struct { hub hub.UseCase subscriptions subscription.UseCase + topics topic.UseCase matcher language.Matcher name string } ) -var DefaultRequestLeaseSeconds = domain.NewLeaseSeconds(uint(time.Duration(time.Hour * 24 * 10).Seconds())) // 10 days +var DefaultRequestLeaseSeconds = time.Duration(10 * 24 * time.Hour).Seconds() // 10 days var ( ErrHubMode = errors.New(common.HubMode + " MUST be " + domain.ModeSubscribe.String() + " or " + @@ -61,74 +65,71 @@ func NewHandler(params NewHandlerParams) *Handler { matcher: params.Matcher, name: params.Name, subscriptions: params.Subscriptions, + topics: params.Topics, } } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + now := time.Now().UTC().Round(time.Second) + switch r.Method { default: http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) case http.MethodPost: req := NewRequest() - if err := req.bind(r); err != nil { + + var err error + if err = req.bind(r); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + // TODO(toby3d): send denied ping to callback if it's not accepted by hub + s := new(domain.Subscription) - req.populate(s) + req.populate(s, now) switch req.Mode { - case domain.ModeSubscribe: - h.hub.Subscribe(r.Context(), *s) - case domain.ModeUnsubscribe: - h.hub.Unsubscribe(r.Context(), *s) + case domain.ModeSubscribe, domain.ModeUnsubscribe: + if _, err = h.hub.Verify(r.Context(), *s, req.Mode); err != nil { + r.Clone(context.WithValue(r.Context(), "error", err)) + + w.WriteHeader(http.StatusAccepted) + + return + } + + switch req.Mode { + case domain.ModeSubscribe: + _, err = h.subscriptions.Subscribe(r.Context(), *s) + case domain.ModeUnsubscribe: + _, err = h.subscriptions.Unsubscribe(r.Context(), *s) + } case domain.ModePublish: - go h.hub.Publish(r.Context(), req.Topic) + _, err = h.topics.Publish(r.Context(), req.Topic) + } + + if err != nil { + r.Clone(context.WithValue(r.Context(), "error", err)) } w.WriteHeader(http.StatusAccepted) case "", http.MethodGet: tags, _, _ := language.ParseAcceptLanguage(r.Header.Get(common.HeaderAcceptLanguage)) tag, _, _ := h.matcher.Match(tags...) - baseOf := template.NewBaseOf(tag, h.name) - - var page template.Page - if r.URL.Query().Has(common.HubTopic) { - topic, err := domain.ParseTopic(r.URL.Query().Get(common.HubTopic)) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - - return - } - - subscriptions, err := h.subscriptions.Fetch(r.Context(), *topic) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - - return - } - - page = &template.Topic{ - BaseOf: baseOf, - Subscribers: len(subscriptions), - } - } else { - page = &template.Home{BaseOf: baseOf} - } w.Header().Set(common.HeaderContentType, common.MIMETextHTMLCharsetUTF8) - template.WriteTemplate(w, page) + template.WriteTemplate(w, &template.Home{BaseOf: template.NewBaseOf(tag, h.name)}) } } func NewRequest() *Request { return &Request{ Mode: domain.ModeUnd, - Callback: domain.Callback{}, + Callback: nil, Secret: domain.Secret{}, - Topic: domain.Topic{}, + Topic: nil, LeaseSeconds: DefaultRequestLeaseSeconds, } } @@ -148,54 +149,33 @@ func (r *Request) bind(req *http.Request) error { return fmt.Errorf("cannot parse %s: %w", common.HubMode, err) } + // NOTE(toby3d): hub.topic + if !req.PostForm.Has(common.HubTopic) { + return fmt.Errorf("%s parameter is required, but not provided", common.HubTopic) + } + + if r.Topic, err = url.Parse(req.PostForm.Get(common.HubTopic)); err != nil { + return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err) + } + switch r.Mode { case domain.ModePublish: - if !req.PostForm.Has(common.HubURL) { - return fmt.Errorf("%s parameter for %s %s is required, but not provided", common.HubURL, - r.Mode, common.HubMode) - } - - topic, err := domain.ParseTopic(req.PostForm.Get(common.HubURL)) - if err != nil { - return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err) - } - - r.Topic = *topic case domain.ModeSubscribe, domain.ModeUnsubscribe: - for _, k := range []string{common.HubTopic, common.HubCallback} { - if req.PostForm.Has(k) { - continue - } - - return fmt.Errorf("%s parameter is required, but not provided", k) - } - - // NOTE(toby3d): hub.topic - topic, err := domain.ParseTopic(req.PostForm.Get(common.HubTopic)) - if err != nil { - return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err) - } - - r.Topic = *topic - // NOTE(toby3d): hub.callback - callback, err := domain.ParseCallback(req.PostForm.Get(common.HubCallback)) - if err != nil { + if !req.PostForm.Has(common.HubCallback) { + return fmt.Errorf("%s parameter is required, but not provided", common.HubCallback) + } + + if r.Callback, err = url.Parse(req.PostForm.Get(common.HubCallback)); err != nil { return fmt.Errorf("cannot parse %s: %w", common.HubCallback, err) } - r.Callback = *callback - // NOTE(toby3d): hub.lease_seconds if r.Mode != domain.ModeUnsubscribe && req.PostForm.Has(common.HubLeaseSeconds) { - var ls uint64 - if ls, err = strconv.ParseUint(req.PostForm.Get(common.HubLeaseSeconds), 10, 64); err != nil { + r.LeaseSeconds, err = strconv.ParseFloat(req.PostForm.Get(common.HubLeaseSeconds), 64) + if err != nil { return fmt.Errorf("cannot parse %s: %w", common.HubLeaseSeconds, err) } - - if ls != 0 { - r.LeaseSeconds = domain.NewLeaseSeconds(uint(ls)) - } } // NOTE(toby3d): hub.secret @@ -218,11 +198,13 @@ func (r *Request) bind(req *http.Request) error { return nil } -func (r Request) populate(s *domain.Subscription) { +func (r Request) populate(s *domain.Subscription, ts time.Time) { + s.CreatedAt = ts + s.UpdatedAt = ts + s.ExpiredAt = ts.Add(time.Duration(r.LeaseSeconds) * time.Second).Round(time.Second) s.Callback = r.Callback - s.LeaseSeconds = r.LeaseSeconds - s.Secret = r.Secret s.Topic = r.Topic + s.Secret = r.Secret } func NewResponse(t domain.Topic, err error) *Response { diff --git a/internal/hub/delivery/http/hub_http_test.go b/internal/hub/delivery/http/hub_http_test.go index f4b2149..cfdd00d 100644 --- a/internal/hub/delivery/http/hub_http_test.go +++ b/internal/hub/delivery/http/hub_http_test.go @@ -2,7 +2,6 @@ package http_test import ( "context" - "errors" "fmt" "net/http" "net/http/httptest" @@ -10,15 +9,16 @@ import ( "strings" "testing" - "github.com/google/go-cmp/cmp" "golang.org/x/text/language" "source.toby3d.me/toby3d/hub/internal/common" "source.toby3d.me/toby3d/hub/internal/domain" delivery "source.toby3d.me/toby3d/hub/internal/hub/delivery/http" - ucase "source.toby3d.me/toby3d/hub/internal/hub/usecase" - "source.toby3d.me/toby3d/hub/internal/subscription" + hubucase "source.toby3d.me/toby3d/hub/internal/hub/usecase" subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory" + subscriptionucase "source.toby3d.me/toby3d/hub/internal/subscription/usecase" + topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory" + topicucase "source.toby3d.me/toby3d/hub/internal/topic/usecase" ) func TestHandler_ServeHTTP_Subscribe(t *testing.T) { @@ -31,22 +31,27 @@ func TestHandler_ServeHTTP_Subscribe(t *testing.T) { in := domain.TestSubscription(t, srv.URL+"/lipsum") subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() - hub := ucase.NewHubUseCase(subscriptions, srv.Client(), &url.URL{Scheme: "https", Host: "hub.exmaple.com"}) + topics := topicmemoryrepo.NewMemoryTopicRepository() + hub := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{ + Scheme: "https", + Host: "hub.exmaple.com", + Path: "/", + }) payload := make(url.Values) domain.ModeSubscribe.AddQuery(payload) in.AddQuery(payload) - req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", - strings.NewReader(payload.Encode())) + req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", strings.NewReader(payload.Encode())) req.Header.Set(common.HeaderContentType, common.MIMEApplicationFormCharsetUTF8) w := httptest.NewRecorder() delivery.NewHandler(delivery.NewHandlerParams{ Hub: hub, - Subscriptions: subscriptions, + Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions, topics), + Topics: topicucase.NewTopicUseCase(topics, srv.Client()), Matcher: language.NewMatcher([]language.Tag{language.English}), - Name: "hub", + Name: "WebSub", }).ServeHTTP(w, req) resp := w.Result() @@ -54,49 +59,45 @@ func TestHandler_ServeHTTP_Subscribe(t *testing.T) { if expect := http.StatusAccepted; resp.StatusCode != expect { t.Errorf("%s %s = %d, want %d", req.Method, req.RequestURI, resp.StatusCode, expect) } - - out, err := subscriptions.Get(context.Background(), in.SUID()) - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(out, in, cmp.AllowUnexported(domain.Secret{}, domain.Callback{}, domain.Topic{}, - domain.LeaseSeconds{})); diff != "" { - t.Error(diff) - } } func TestHandler_ServeHTTP_Unsubscribe(t *testing.T) { t.Parallel() srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(common.HeaderContentType, common.MIMETextPlainCharsetUTF8) fmt.Fprint(w, r.URL.Query().Get(common.HubChallenge)) })) t.Cleanup(srv.Close) in := domain.TestSubscription(t, srv.URL+"/lipsum") subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() + topics := topicmemoryrepo.NewMemoryTopicRepository() if err := subscriptions.Create(context.Background(), in.SUID(), *in); err != nil { t.Fatal(err) } - hub := ucase.NewHubUseCase(subscriptions, srv.Client(), &url.URL{Scheme: "https", Host: "hub.exmaple.com"}) + hub := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{ + Scheme: "https", + Host: "hub.exmaple.com", + Path: "/", + }) payload := make(url.Values) domain.ModeUnsubscribe.AddQuery(payload) in.AddQuery(payload) - req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", - strings.NewReader(payload.Encode())) + req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", strings.NewReader(payload.Encode())) req.Header.Set(common.HeaderContentType, common.MIMEApplicationFormCharsetUTF8) w := httptest.NewRecorder() delivery.NewHandler(delivery.NewHandlerParams{ Hub: hub, - Subscriptions: subscriptions, + Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions, topics), + Topics: topicucase.NewTopicUseCase(topics, srv.Client()), Matcher: language.NewMatcher([]language.Tag{language.English}), - Name: "hub", + Name: "WebSub", }).ServeHTTP(w, req) resp := w.Result() @@ -104,8 +105,4 @@ func TestHandler_ServeHTTP_Unsubscribe(t *testing.T) { if expect := http.StatusAccepted; resp.StatusCode != expect { t.Errorf("%s %s = %d, want %d", req.Method, req.RequestURI, resp.StatusCode, expect) } - - if _, err := subscriptions.Get(context.Background(), in.SUID()); !errors.Is(err, subscription.ErrNotExist) { - t.Errorf("want %s, got %s", subscription.ErrNotExist, err) - } } diff --git a/internal/hub/usecase.go b/internal/hub/usecase.go index 4dcb873..a3f0d6a 100644 --- a/internal/hub/usecase.go +++ b/internal/hub/usecase.go @@ -8,9 +8,8 @@ import ( ) type UseCase interface { - Subscribe(ctx context.Context, subscription domain.Subscription) (bool, error) - Unsubscribe(ctx context.Context, subscription domain.Subscription) (bool, error) - Publish(ctx context.Context, t domain.Topic) error + Verify(ctx context.Context, subscription domain.Subscription, mode domain.Mode) (bool, error) + ListenAndServe(ctx context.Context) error } var ( diff --git a/internal/hub/usecase/hub_ucase.go b/internal/hub/usecase/hub_ucase.go index 5e4eef3..f5cbb4a 100644 --- a/internal/hub/usecase/hub_ucase.go +++ b/internal/hub/usecase/hub_ucase.go @@ -3,21 +3,26 @@ package usecase import ( "bytes" "context" - "errors" + "crypto/hmac" + "encoding/hex" "fmt" "io" "math/rand" "net/http" "net/url" + "strconv" + "time" "source.toby3d.me/toby3d/hub/internal/common" "source.toby3d.me/toby3d/hub/internal/domain" "source.toby3d.me/toby3d/hub/internal/hub" "source.toby3d.me/toby3d/hub/internal/subscription" + "source.toby3d.me/toby3d/hub/internal/topic" ) type hubUseCase struct { subscriptions subscription.Repository + topics topic.Repository client *http.Client self *url.URL } @@ -27,11 +32,12 @@ const ( lengthMax = 32 ) -func NewHubUseCase(subscriptions subscription.Repository, client *http.Client, self *url.URL) hub.UseCase { +func NewHubUseCase(t topic.Repository, s subscription.Repository, c *http.Client, u *url.URL) hub.UseCase { return &hubUseCase{ - subscriptions: subscriptions, - client: client, - self: self, + client: c, + self: u, + topics: t, + subscriptions: s, } } @@ -41,15 +47,15 @@ func (ucase *hubUseCase) Verify(ctx context.Context, s domain.Subscription, mode return false, fmt.Errorf("cannot generate hub.challenge: %w", err) } - u := s.Callback.URL() + u, _ := url.Parse(s.Callback.String()) q := u.Query() - for _, w := range []domain.QueryAdder{mode, s.Topic, challenge} { - w.AddQuery(q) - } + mode.AddQuery(q) + q.Add(common.HubTopic, s.Topic.String()) + challenge.AddQuery(q) if mode == domain.ModeSubscribe { - s.LeaseSeconds.AddQuery(q) + q.Add(common.HubLeaseSeconds, strconv.FormatFloat(s.LeaseSeconds(), 'g', 0, 64)) } u.RawQuery = q.Encode() @@ -77,118 +83,104 @@ func (ucase *hubUseCase) Verify(ctx context.Context, s domain.Subscription, mode return false, fmt.Errorf("cannot verify subscriber response body: %w", err) } - if !challenge.Equal(body) { + if !challenge.Equal(string(body)) { return false, fmt.Errorf("%w: got '%s', want '%s'", hub.ErrChallenge, body, *challenge) } return true, nil } -func (ucase *hubUseCase) Subscribe(ctx context.Context, s domain.Subscription) (bool, error) { - var err error - if _, err = ucase.Verify(ctx, s, domain.ModeSubscribe); err != nil { - return false, fmt.Errorf("cannot validate subscription request: %w", err) - } +func (ucase *hubUseCase) ListenAndServe(ctx context.Context) error { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - suid := s.SUID() + for ts := range ticker.C { + ts = ts.Round(time.Second) - if _, err = ucase.subscriptions.Get(ctx, suid); err != nil { - if !errors.Is(err, subscription.ErrNotExist) { - return false, fmt.Errorf("cannot check exists subscriptions: %w", err) + topics, err := ucase.topics.Fetch(ctx) + if err != nil { + return fmt.Errorf("cannot fetch topics: %w", err) } - if err = ucase.subscriptions.Create(ctx, suid, s); err != nil { - return false, fmt.Errorf("cannot create a new subscription: %w", err) + for i := range topics { + subscriptions, err := ucase.subscriptions.Fetch(ctx, &topics[i]) + if err != nil { + return fmt.Errorf("cannot fetch subscriptions: %w", err) + } + + for j := range subscriptions { + if subscriptions[j].Expired(ts) { + if err = ucase.subscriptions.Delete(ctx, subscriptions[j].SUID()); err != nil { + return fmt.Errorf("cannot remove expired subcription: %w", err) + } + + continue + } + + if subscriptions[j].Synced(topics[i]) { + continue + } + + go ucase.push(ctx, subscriptions[j], topics[i], ts) + } } - - return true, nil - } - - if err = ucase.subscriptions.Update(ctx, suid, func(buf *domain.Subscription) (*domain.Subscription, error) { - buf.LeaseSeconds = s.LeaseSeconds - buf.Secret = s.Secret - - return buf, nil - }); err != nil { - return false, fmt.Errorf("cannot update subscription: %w", err) - } - - return false, nil -} - -func (ucase *hubUseCase) Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error) { - var err error - if _, err = ucase.Verify(ctx, s, domain.ModeUnsubscribe); err != nil { - return false, fmt.Errorf("cannot validate unsubscription request: %w", err) - } - - if err = ucase.subscriptions.Delete(ctx, s.SUID()); err != nil { - return false, fmt.Errorf("cannot remove subscription: %w", err) - } - - return true, nil -} - -func (ucase *hubUseCase) Publish(ctx context.Context, t domain.Topic) error { - resp, err := ucase.client.Get(t.String()) - if err != nil { - return fmt.Errorf("cannot fetch topic payload for publishing: %w", err) - } - - push := domain.Push{ContentType: resp.Header.Get(common.HeaderContentType)} - - canonicalTopic, err := domain.ParseTopic(resp.Request.URL.String()) - if err != nil { - return fmt.Errorf("cannot parse canonical topic URL: %w", err) - } - - push.Self = *canonicalTopic - - if push.Content, err = io.ReadAll(resp.Body); err != nil { - return fmt.Errorf("cannot read topic body: %w", err) - } - - subscriptions, err := ucase.subscriptions.Fetch(ctx, t) - if err != nil { - return fmt.Errorf("cannot fetch subscriptions for topic: %w", err) - } - - for i := range subscriptions { - ucase.Push(ctx, push, subscriptions[i]) } return nil } -func (ucase *hubUseCase) Push(ctx context.Context, p domain.Push, s domain.Subscription) (bool, error) { - req, err := http.NewRequest(http.MethodPost, s.Callback.String(), bytes.NewReader(p.Content)) +func (ucase *hubUseCase) push(ctx context.Context, s domain.Subscription, t domain.Topic, ts time.Time) (bool, error) { + req, err := http.NewRequest(http.MethodPost, s.Callback.String(), bytes.NewReader(t.Content)) if err != nil { - return false, fmt.Errorf("cannot build push request: %w", err) + return false, fmt.Errorf("cannot build request: %w", err) } - req.Header.Set(common.HeaderContentType, p.ContentType) - req.Header.Set(common.HeaderLink, `<`+ucase.self.String()+`>; rel="hub", <`+p.Self.String()+`>; rel="self"`) - p.SetXHubSignatureHeader(req, domain.AlgorithmSHA512, s.Secret) + req.Header.Set(common.HeaderContentType, t.ContentType) + req.Header.Set(common.HeaderLink, `<`+ucase.self.String()+`>; rel="hub", <`+s.Topic.String()+`>; rel="self"`) + setXHubSignatureHeader(req, domain.AlgorithmSHA512, s.Secret, t.Content) resp, err := ucase.client.Do(req) if err != nil { return false, fmt.Errorf("cannot push: %w", err) } - // The subscriber's callback URL MAY return an HTTP 410 code to indicate that the subscription has been - // deleted, and the hub MAY terminate the subscription if it receives that code as a response. + suid := s.SUID() + + // The subscriber's callback URL MAY return an HTTP 410 code to indicate + // that the subscription has been deleted, and the hub MAY terminate the + // subscription if it receives that code as a response. if resp.StatusCode == http.StatusGone { - if err = ucase.subscriptions.Delete(ctx, s.SUID()); err != nil { + if err = ucase.subscriptions.Delete(ctx, suid); err != nil { return false, fmt.Errorf("cannot remove deleted subscription: %w", err) } return true, nil } - // The subscriber's callback URL MUST return an HTTP 2xx response code to indicate a success. + // The subscriber's callback URL MUST return an HTTP 2xx response code + // to indicate a success. if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { return false, hub.ErrStatus } + if err = ucase.subscriptions.Update(ctx, suid, func(tx *domain.Subscription) (*domain.Subscription, error) { + tx.SyncedAt = t.UpdatedAt + + return tx, nil + }); err != nil { + return false, fmt.Errorf("cannot sync sybsciption status: %w", err) + } + return true, nil } + +func setXHubSignatureHeader(req *http.Request, alg domain.Algorithm, secret domain.Secret, body []byte) { + if !secret.IsSet() || alg == domain.AlgorithmUnd { + return + } + + h := hmac.New(alg.Hash, []byte(secret.String())) + h.Write(body) + + req.Header.Set(common.HeaderXHubSignature, alg.String()+"="+hex.EncodeToString(h.Sum(nil))) +} diff --git a/internal/hub/usecase/hub_ucase_test.go b/internal/hub/usecase/hub_ucase_test.go new file mode 100644 index 0000000..eefd117 --- /dev/null +++ b/internal/hub/usecase/hub_ucase_test.go @@ -0,0 +1,43 @@ +package usecase_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "source.toby3d.me/toby3d/hub/internal/common" + "source.toby3d.me/toby3d/hub/internal/domain" + hubucase "source.toby3d.me/toby3d/hub/internal/hub/usecase" + subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory" + topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory" +) + +func TestHubUseCase_Verify(t *testing.T) { + t.Parallel() + + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(common.HeaderContentType, common.MIMETextPlainCharsetUTF8) + fmt.Fprint(w, r.FormValue(common.HubChallenge)) + })) + t.Cleanup(srv.Close) + + subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() + topics := topicmemoryrepo.NewMemoryTopicRepository() + subscription := domain.TestSubscription(t, srv.URL) + + ok, err := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{ + Scheme: "https", + Host: "hub.example.com", + Path: "/", + }).Verify(context.Background(), *subscription, domain.ModeSubscribe) + if err != nil { + t.Fatal(err) + } + + if !ok { + t.Errorf("want %t, got %t", true, ok) + } +} diff --git a/internal/middleware/logfmt.go b/internal/middleware/logfmt.go index 3d95444..bcedabb 100644 --- a/internal/middleware/logfmt.go +++ b/internal/middleware/logfmt.go @@ -68,6 +68,7 @@ func LogFmtWithConfig(config LogFmtConfig) Interceptor { next(rw, r) + rw.error, _ = r.Context().Value("error").(error) end := time.Now().UTC() encoder.EncodeKeyvals( diff --git a/internal/subscription/repository.go b/internal/subscription/repository.go index 83b28bf..f5ecbe0 100644 --- a/internal/subscription/repository.go +++ b/internal/subscription/repository.go @@ -13,7 +13,7 @@ type ( Repository interface { Create(ctx context.Context, suid domain.SUID, subscription domain.Subscription) error Get(ctx context.Context, suid domain.SUID) (*domain.Subscription, error) - Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error) + Fetch(ctx context.Context, topic *domain.Topic) ([]domain.Subscription, error) Update(ctx context.Context, suid domain.SUID, update UpdateFunc) error Delete(ctx context.Context, suid domain.SUID) error } diff --git a/internal/subscription/repository/memory/memory_subscription.go b/internal/subscription/repository/memory/memory_subscription.go index 5e00cc6..273353e 100644 --- a/internal/subscription/repository/memory/memory_subscription.go +++ b/internal/subscription/repository/memory/memory_subscription.go @@ -15,10 +15,15 @@ type memorySubscriptionRepository struct { subscriptions map[domain.SUID]domain.Subscription } -// Create implements subscription.Repository +func NewMemorySubscriptionRepository() subscription.Repository { + return &memorySubscriptionRepository{ + mutex: new(sync.RWMutex), + subscriptions: make(map[domain.SUID]domain.Subscription), + } +} + func (repo *memorySubscriptionRepository) Create(ctx context.Context, suid domain.SUID, s domain.Subscription) error { - _, err := repo.Get(ctx, suid) - if err != nil { + if _, err := repo.Get(ctx, suid); err != nil { if !errors.Is(err, subscription.ErrNotExist) { return fmt.Errorf("cannot create subscription: %w", err) } @@ -34,7 +39,6 @@ func (repo *memorySubscriptionRepository) Create(ctx context.Context, suid domai return nil } -// Delete implements subscription.Repository func (repo *memorySubscriptionRepository) Delete(ctx context.Context, suid domain.SUID) error { if _, err := repo.Get(ctx, suid); err != nil { if !errors.Is(err, subscription.ErrNotExist) { @@ -52,7 +56,6 @@ func (repo *memorySubscriptionRepository) Delete(ctx context.Context, suid domai return nil } -// Get implements subscription.Repository func (repo *memorySubscriptionRepository) Get(_ context.Context, suid domain.SUID) (*domain.Subscription, error) { repo.mutex.RLock() defer repo.mutex.RUnlock() @@ -64,14 +67,14 @@ func (repo *memorySubscriptionRepository) Get(_ context.Context, suid domain.SUI return nil, subscription.ErrNotExist } -func (repo *memorySubscriptionRepository) Fetch(ctx context.Context, t domain.Topic) ([]domain.Subscription, error) { +func (repo *memorySubscriptionRepository) Fetch(ctx context.Context, t *domain.Topic) ([]domain.Subscription, error) { repo.mutex.RLock() defer repo.mutex.RUnlock() out := make([]domain.Subscription, 0) for _, s := range repo.subscriptions { - if !s.Topic.Equal(t) { + if t != nil && t.Self.String() != s.Topic.String() { continue } @@ -100,10 +103,3 @@ func (repo *memorySubscriptionRepository) Update(ctx context.Context, suid domai return nil } - -func NewMemorySubscriptionRepository() subscription.Repository { - return &memorySubscriptionRepository{ - mutex: new(sync.RWMutex), - subscriptions: make(map[domain.SUID]domain.Subscription), - } -} diff --git a/internal/subscription/usecase.go b/internal/subscription/usecase.go index 6910665..725ca9f 100644 --- a/internal/subscription/usecase.go +++ b/internal/subscription/usecase.go @@ -7,5 +7,6 @@ import ( ) type UseCase interface { - Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error) + Subscribe(ctx context.Context, s domain.Subscription) (bool, error) + Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error) } diff --git a/internal/subscription/usecase/subscription_ucase.go b/internal/subscription/usecase/subscription_ucase.go index f4f49e1..727ca10 100644 --- a/internal/subscription/usecase/subscription_ucase.go +++ b/internal/subscription/usecase/subscription_ucase.go @@ -2,27 +2,94 @@ package usecase import ( "context" + "errors" "fmt" + "io" + "net/http" + "time" + "source.toby3d.me/toby3d/hub/internal/common" "source.toby3d.me/toby3d/hub/internal/domain" "source.toby3d.me/toby3d/hub/internal/subscription" + "source.toby3d.me/toby3d/hub/internal/topic" ) type subscriptionUseCase struct { + topics topic.Repository subscriptions subscription.Repository + client *http.Client } -func NewSubscriptionUseCase(subscriptions subscription.Repository) subscription.UseCase { +func NewSubscriptionUseCase(subs subscription.Repository, tops topic.Repository, c *http.Client) subscription.UseCase { return &subscriptionUseCase{ - subscriptions: subscriptions, + subscriptions: subs, + topics: tops, + client: c, } } -func (ucase *subscriptionUseCase) Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error) { - out, err := ucase.subscriptions.Fetch(ctx, topic) - if err != nil { - return nil, fmt.Errorf("cannot fetch subscriptions for topic: %w", err) +func (ucase *subscriptionUseCase) Subscribe(ctx context.Context, s domain.Subscription) (bool, error) { + now := time.Now().UTC().Round(time.Second) + + if _, err := ucase.topics.Get(context.Background(), s.Topic); err != nil { + if !errors.Is(err, topic.ErrNotExist) { + return false, fmt.Errorf("cannot check subscription topic: %w", err) + } + + resp, err := ucase.client.Get(s.Topic.String()) + if err != nil { + return false, fmt.Errorf("cannot fetch a new topic subscription content: %w", err) + } + + content, err := io.ReadAll(resp.Body) + if err != nil { + return false, fmt.Errorf("cannot read a new topic subscription content: %w", err) + } + + if err = ucase.topics.Create(ctx, s.Topic, domain.Topic{ + CreatedAt: now, + UpdatedAt: now, + Self: s.Topic, + ContentType: resp.Header.Get(common.HeaderContentType), + Content: content, + }); err != nil { + return false, fmt.Errorf("cannot create topic for subsciption: %w", err) + } } - return out, nil + if err := ucase.subscriptions.Create(ctx, s.SUID(), domain.Subscription{ + CreatedAt: now, + UpdatedAt: now, + SyncedAt: now, + ExpiredAt: s.ExpiredAt, + Callback: s.Callback, + Topic: s.Topic, + Secret: s.Secret, + }); err != nil { + if !errors.Is(err, subscription.ErrExist) { + return false, fmt.Errorf("cannot create a new subscription: %w", err) + } + + if err = ucase.subscriptions.Update(ctx, s.SUID(), func(tx *domain.Subscription) (*domain.Subscription, + error, + ) { + tx.UpdatedAt = now + tx.ExpiredAt = now.Add(time.Duration(s.LeaseSeconds()) * time.Second) + tx.Secret = s.Secret + + return tx, nil + }); err != nil { + return false, fmt.Errorf("cannot resubscribe existing subscription: %w", err) + } + } + + return true, nil +} + +func (ucase *subscriptionUseCase) Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error) { + if err := ucase.subscriptions.Delete(ctx, s.SUID()); err != nil { + return false, fmt.Errorf("cannot unsubscribe: %w", err) + } + + return true, nil } diff --git a/internal/subscription/usecase/subscription_ucase_test.go b/internal/subscription/usecase/subscription_ucase_test.go new file mode 100644 index 0000000..8e7c42e --- /dev/null +++ b/internal/subscription/usecase/subscription_ucase_test.go @@ -0,0 +1,73 @@ +package usecase_test + +import ( + "context" + "testing" + + "source.toby3d.me/toby3d/hub/internal/domain" + subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory" + "source.toby3d.me/toby3d/hub/internal/subscription/usecase" + topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory" +) + +func TestSubscriptionUseCase_Subscribe(t *testing.T) { + t.Parallel() + + subscription := domain.TestSubscription(t, "https://example.com/") + topics := topicmemoryrepo.NewMemoryTopicRepository() + subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() + + ucase := usecase.NewSubscriptionUseCase(subscriptions, topics) + + ok, err := ucase.Subscribe(context.Background(), *subscription) + if err != nil { + t.Fatal(err) + } + + if !ok { + t.Errorf("want %t, got %t", true, ok) + } + + if _, err := subscriptions.Get(context.Background(), subscription.SUID()); err != nil { + t.Fatal(err) + } + + t.Run("resubscribe", func(t *testing.T) { + t.Parallel() + + ok, err := ucase.Subscribe(context.Background(), *subscription) + if err != nil { + t.Fatal(err) + } + + if !ok { + t.Errorf("want %t, got %t", true, ok) + } + }) +} + +func TestSubscriptionUseCase_Unsubscribe(t *testing.T) { + t.Parallel() + + subscription := domain.TestSubscription(t, "https://example.com/") + topics := topicmemoryrepo.NewMemoryTopicRepository() + subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() + + if err := subscriptions.Create(context.Background(), subscription.SUID(), *subscription); err != nil { + t.Fatal(err) + } + + ok, err := usecase.NewSubscriptionUseCase(subscriptions, topics). + Unsubscribe(context.Background(), *subscription) + if err != nil { + t.Fatal(err) + } + + if !ok { + t.Errorf("want %t, got %t", true, ok) + } + + if _, err := subscriptions.Get(context.Background(), subscription.SUID()); err == nil { + t.Error("want error, got nil") + } +} diff --git a/internal/topic/repository.go b/internal/topic/repository.go new file mode 100644 index 0000000..e8371d4 --- /dev/null +++ b/internal/topic/repository.go @@ -0,0 +1,25 @@ +package topic + +import ( + "context" + "errors" + "net/url" + + "source.toby3d.me/toby3d/hub/internal/domain" +) + +type ( + UpdateFunc func(t *domain.Topic) (*domain.Topic, error) + + Repository interface { + Create(ctx context.Context, u *url.URL, topic domain.Topic) error + Update(ctx context.Context, u *url.URL, update UpdateFunc) error + Fetch(ctx context.Context) ([]domain.Topic, error) + Get(ctx context.Context, u *url.URL) (*domain.Topic, error) + } +) + +var ( + ErrExist = errors.New("topic already exists") + ErrNotExist = errors.New("topic does not exist") +) diff --git a/internal/topic/repository/memory/memory_topic.go b/internal/topic/repository/memory/memory_topic.go new file mode 100644 index 0000000..94eeec5 --- /dev/null +++ b/internal/topic/repository/memory/memory_topic.go @@ -0,0 +1,85 @@ +package memory + +import ( + "context" + "errors" + "fmt" + "net/url" + "sync" + + "source.toby3d.me/toby3d/hub/internal/domain" + "source.toby3d.me/toby3d/hub/internal/topic" +) + +type memoryTopicRepository struct { + mutex *sync.RWMutex + topics map[string]domain.Topic +} + +func NewMemoryTopicRepository() topic.Repository { + return &memoryTopicRepository{ + mutex: new(sync.RWMutex), + topics: make(map[string]domain.Topic), + } +} + +func (repo *memoryTopicRepository) Update(ctx context.Context, u *url.URL, update topic.UpdateFunc) error { + tx, err := repo.Get(ctx, u) + if err != nil { + return fmt.Errorf("cannot find updating topic: %w", err) + } + + repo.mutex.Lock() + defer repo.mutex.Unlock() + + result, err := update(tx) + if err != nil { + return fmt.Errorf("cannot update topic: %w", err) + } + + repo.topics[u.String()] = *result + + return nil +} + +func (repo *memoryTopicRepository) Create(ctx context.Context, u *url.URL, t domain.Topic) error { + _, err := repo.Get(ctx, u) + if err != nil && !errors.Is(err, topic.ErrNotExist) { + return fmt.Errorf("cannot get topic: %w", err) + } + + if err == nil { + return topic.ErrExist + } + + repo.mutex.Lock() + defer repo.mutex.Unlock() + + repo.topics[u.String()] = t + + return nil +} + +func (repo *memoryTopicRepository) Get(ctx context.Context, u *url.URL) (*domain.Topic, error) { + repo.mutex.RLock() + defer repo.mutex.RUnlock() + + if out, ok := repo.topics[u.String()]; ok { + return &out, nil + } + + return nil, topic.ErrNotExist +} + +func (repo *memoryTopicRepository) Fetch(_ context.Context) ([]domain.Topic, error) { + repo.mutex.RLock() + defer repo.mutex.RUnlock() + + out := make([]domain.Topic, 0) + + for _, t := range repo.topics { + out = append(out, t) + } + + return out, nil +} diff --git a/internal/topic/usecase.go b/internal/topic/usecase.go new file mode 100644 index 0000000..54007b7 --- /dev/null +++ b/internal/topic/usecase.go @@ -0,0 +1,10 @@ +package topic + +import ( + "context" + "net/url" +) + +type UseCase interface { + Publish(ctx context.Context, u *url.URL) (bool, error) +} diff --git a/internal/topic/usecase/topic_ucase.go b/internal/topic/usecase/topic_ucase.go new file mode 100644 index 0000000..d6edd00 --- /dev/null +++ b/internal/topic/usecase/topic_ucase.go @@ -0,0 +1,66 @@ +package usecase + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "source.toby3d.me/toby3d/hub/internal/common" + "source.toby3d.me/toby3d/hub/internal/domain" + "source.toby3d.me/toby3d/hub/internal/topic" +) + +type topicUseCase struct { + client *http.Client + topics topic.Repository +} + +func NewTopicUseCase(topics topic.Repository, client *http.Client) topic.UseCase { + return &topicUseCase{ + topics: topics, + client: client, + } +} + +func (ucase *topicUseCase) Publish(ctx context.Context, u *url.URL) (bool, error) { + now := time.Now().UTC().Round(time.Second) + + resp, err := ucase.client.Get(u.String()) + if err != nil { + return false, fmt.Errorf("cannot fetch publishing url: %w", err) + } + + content, err := io.ReadAll(resp.Body) + if err != nil { + return false, fmt.Errorf("cannot read topic response body: %w", err) + } + + if err := ucase.topics.Update(ctx, u, func(tx *domain.Topic) (*domain.Topic, error) { + tx.Self = resp.Request.URL + tx.UpdatedAt = now + tx.Content = content + tx.ContentType = resp.Header.Get(common.HeaderContentType) + + return tx, nil + }); err != nil { + if !errors.Is(err, topic.ErrNotExist) { + return false, fmt.Errorf("cannot publish exists topic: %w", err) + } + + if err = ucase.topics.Create(ctx, resp.Request.URL, domain.Topic{ + CreatedAt: now, + UpdatedAt: now, + Self: resp.Request.URL, + ContentType: resp.Header.Get(common.HeaderContentType), + Content: content, + }); err != nil { + return false, fmt.Errorf("cannot publish a new topic: %w", err) + } + } + + return true, nil +} diff --git a/internal/topic/usecase/topic_ucase_test.go b/internal/topic/usecase/topic_ucase_test.go new file mode 100644 index 0000000..ece96ef --- /dev/null +++ b/internal/topic/usecase/topic_ucase_test.go @@ -0,0 +1,43 @@ +package usecase_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "source.toby3d.me/toby3d/hub/internal/common" + "source.toby3d.me/toby3d/hub/internal/domain" + topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory" + "source.toby3d.me/toby3d/hub/internal/topic/usecase" +) + +func TestTopicUseCase_Publish(t *testing.T) { + t.Parallel() + + topic := domain.TestTopic(t) + topics := topicmemoryrepo.NewMemoryTopicRepository() + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set(common.HeaderContentType, topic.ContentType) + fmt.Fprint(w, topic.Content) + })) + t.Cleanup(srv.Close) + + topic.Self, _ = url.Parse(srv.URL + "/") + + ok, err := usecase.NewTopicUseCase(topics, srv.Client()). + Publish(context.Background(), topic.Self) + if err != nil { + t.Fatal(err) + } + + if !ok { + t.Errorf("want %t, got %t", true, ok) + } + + if _, err := topics.Get(context.Background(), topic.Self); err != nil { + t.Fatal(err) + } +} diff --git a/internal/urlutil/shift_path_test.go b/internal/urlutil/shift_path_test.go new file mode 100644 index 0000000..f26bcfa --- /dev/null +++ b/internal/urlutil/shift_path_test.go @@ -0,0 +1,37 @@ +package urlutil_test + +import ( + "testing" + + "source.toby3d.me/toby3d/hub/internal/urlutil" +) + +func TestShiftPath(t *testing.T) { + t.Parallel() + + for name, tc := range map[string]struct { + input string + expect [2]string + }{ + "empty": {input: "", expect: [2]string{"", "/"}}, + "root": {input: "/", expect: [2]string{"", "/"}}, + "page": {input: "/foo", expect: [2]string{"foo", "/"}}, + "folder": {input: "/foo/bar", expect: [2]string{"foo", "/bar"}}, + } { + name, tc := name, tc + + t.Run(name, func(t *testing.T) { + t.Parallel() + + head, tail := urlutil.ShiftPath(tc.input) + + if head != tc.expect[0] { + t.Errorf("want '%s', got '%s'", tc.expect[0], head) + } + + if tail != tc.expect[1] { + t.Errorf("want '%s', got '%s'", tc.expect[1], tail) + } + }) + } +} diff --git a/main.go b/main.go index 7cd7894..c570ca5 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ package main import ( + "context" "embed" "io/fs" "log" @@ -24,6 +25,8 @@ import ( "source.toby3d.me/toby3d/hub/internal/middleware" subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory" subscriptionucase "source.toby3d.me/toby3d/hub/internal/subscription/usecase" + topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory" + topicucase "source.toby3d.me/toby3d/hub/internal/topic/usecase" "source.toby3d.me/toby3d/hub/internal/urlutil" ) @@ -50,6 +53,8 @@ func init() { } func main() { + ctx := context.Background() + config := new(domain.Config) if err := env.Parse(config, env.Options{Prefix: "HUB_"}); err != nil { logger.Fatalln(err) @@ -60,33 +65,41 @@ func main() { logger.Fatalln(err) } - client := &http.Client{Timeout: 1 * time.Second} + client := &http.Client{Timeout: 5 * time.Second} matcher := language.NewMatcher(message.DefaultCatalog.Languages()) subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository() - hub := hubhttprelivery.NewHandler(hubhttprelivery.NewHandlerParams{ - Hub: hubucase.NewHubUseCase(subscriptions, client, config.BaseURL), - Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions), + topics := topicmemoryrepo.NewMemoryTopicRepository() + topicService := topicucase.NewTopicUseCase(topics, client) + subscriptionService := subscriptionucase.NewSubscriptionUseCase(subscriptions, topics, client) + hubService := hubucase.NewHubUseCase(topics, subscriptions, client, config.BaseURL) + + handler := hubhttprelivery.NewHandler(hubhttprelivery.NewHandlerParams{ + Hub: hubService, + Subscriptions: subscriptionService, + Topics: topicService, Matcher: matcher, Name: config.Name, }) server := &http.Server{ - Addr: ":3000", + Addr: config.Bind, Handler: http.HandlerFunc(middleware.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { head, _ := urlutil.ShiftPath(r.URL.Path) switch head { case "": - hub.ServeHTTP(w, r) + handler.ServeHTTP(w, r) case "static": http.FileServer(http.FS(static)).ServeHTTP(w, r) } }).Intercept(middleware.LogFmt())), - ReadTimeout: 1 * time.Second, - WriteTimeout: 1 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, ErrorLog: logger, } + go hubService.ListenAndServe(ctx) + if err = server.ListenAndServe(); err != nil { logger.Fatalln(err) } diff --git a/web/static/manifest.webmanifest b/web/static/manifest.webmanifest index c1736c8..8a51461 100644 --- a/web/static/manifest.webmanifest +++ b/web/static/manifest.webmanifest @@ -1,24 +1,16 @@ { - "lang": "en", - "dir": "ltr", - "name": "WebSub", - "description": "A dead simple WebSub hub", - "short_name": "WebSub", - "icons": [{ - "purpose": "maskable", - "sizes": "192x192", - "src": "icon-192x192.png", - "type": "image/png" - },{ - "purpose": "maskable", - "sizes": "512x512", - "src": "icon-512x512.png", - "type": "image/png" - }], - "scope": "/", - "start_url": "/", - "display": "fullscreen", - "orientation": "landscape", - "theme_color": "#ff6fcf", - "background_color": "#ff6fcf" -} \ No newline at end of file + "icons": [ + { + "purpose": "maskable", + "sizes": "192x192", + "src": "icon-192x192.png", + "type": "image/png" + }, + { + "purpose": "maskable", + "sizes": "512x512", + "src": "icon-512x512.png", + "type": "image/png" + } + ] +} diff --git a/web/template/baseof.qtpl b/web/template/baseof.qtpl index dcf6b99..c388f1e 100644 --- a/web/template/baseof.qtpl +++ b/web/template/baseof.qtpl @@ -38,6 +38,9 @@ func NewBaseOf(lang language.Tag, name string) *BaseOf { {% func (p *BaseOf) head() %} {% comment %}https://evilmartians.com/chronicles/how-to-favicon-in-2021-six-files-that-fit-most-needs{% endcomment %} + + @@ -48,9 +51,6 @@ func NewBaseOf(lang language.Tag, name string) *BaseOf { - - {% endfunc %} {% func (p *BaseOf) body() %}{% endfunc %}