diff --git a/internal/common/common.go b/internal/common/common.go
index 1a97698..b3c68df 100644
--- a/internal/common/common.go
+++ b/internal/common/common.go
@@ -5,6 +5,8 @@ const (
MIMEApplicationFormCharsetUTF8 = MIMEApplicationForm + "; " + charsetUTF8
MIMETextHTML = "text/html"
MIMETextHTMLCharsetUTF8 = MIMETextHTML + "; " + charsetUTF8
+ MIMETextPlain = "text/plain"
+ MIMETextPlainCharsetUTF8 = MIMETextPlain + "; " + charsetUTF8
charsetUTF8 = "charset=UTF-8"
)
diff --git a/internal/domain/callback.go b/internal/domain/callback.go
deleted file mode 100644
index 1cae0f1..0000000
--- a/internal/domain/callback.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package domain
-
-import (
- "fmt"
- "net/url"
-
- "source.toby3d.me/toby3d/hub/internal/common"
-)
-
-// Callback describes the URL at which a subscriber wishes to receive content
-// distribution requests.
-type Callback struct {
- callback *url.URL
-}
-
-func ParseCallback(str string) (*Callback, error) {
- u, err := url.Parse(str)
- if err != nil {
- return nil, fmt.Errorf("cannot parse string as callback URL: %w", err)
- }
-
- return &Callback{callback: u}, nil
-}
-
-func (c Callback) AddQuery(q url.Values) {
- q.Add(common.HubCallback, c.callback.String())
-}
-
-func (c Callback) URL() *url.URL {
- u, _ := url.Parse(c.callback.String())
-
- return u
-}
-
-func (c Callback) String() string {
- return c.callback.String()
-}
diff --git a/internal/domain/challenge.go b/internal/domain/challenge.go
index e6e45c8..6e01d59 100644
--- a/internal/domain/challenge.go
+++ b/internal/domain/challenge.go
@@ -1,7 +1,6 @@
package domain
import (
- "bytes"
"crypto/rand"
"encoding/base64"
"fmt"
@@ -11,7 +10,7 @@ import (
)
type Challenge struct {
- challenge []byte
+ challenge string
}
func NewChallenge(length uint8) (*Challenge, error) {
@@ -20,15 +19,15 @@ func NewChallenge(length uint8) (*Challenge, error) {
return nil, fmt.Errorf("cannot create a new challenge: %w", err)
}
- return &Challenge{challenge: []byte(base64.URLEncoding.EncodeToString(src))}, nil
+ return &Challenge{challenge: base64.URLEncoding.EncodeToString(src)}, nil
}
func (c Challenge) AddQuery(q url.Values) {
q.Add(common.HubChallenge, string(c.challenge))
}
-func (c Challenge) Equal(target []byte) bool {
- return bytes.Equal(c.challenge, target)
+func (c Challenge) Equal(target string) bool {
+ return c.challenge == target
}
func (c Challenge) String() string {
diff --git a/internal/domain/lease_seconds.go b/internal/domain/lease_seconds.go
deleted file mode 100644
index 0d12fd6..0000000
--- a/internal/domain/lease_seconds.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package domain
-
-import (
- "net/url"
- "strconv"
-
- "source.toby3d.me/toby3d/hub/internal/common"
-)
-
-// LeaseSeconds describes a number of seconds for which the subscriber would
-// like to have the subscription active, given as a positive decimal integer.
-// Hubs MAY choose to respect this value or not, depending on their own
-// policies, and MAY set a default value if the subscriber omits the parameter.
-// This parameter MAY be present for unsubscription requests and MUST be ignored
-// by the hub in that case.
-type LeaseSeconds struct {
- leaseSeconds uint
-}
-
-func NewLeaseSeconds(raw uint) LeaseSeconds {
- return LeaseSeconds{leaseSeconds: raw}
-}
-
-func (ls LeaseSeconds) AddQuery(q url.Values) {
- if ls.leaseSeconds == 0 {
- return
- }
-
- q.Add(common.HubLeaseSeconds, strconv.FormatUint(uint64(ls.leaseSeconds), 10))
-}
-
-func (ls LeaseSeconds) IsZero() bool {
- return ls.leaseSeconds == 0
-}
diff --git a/internal/domain/push.go b/internal/domain/push.go
deleted file mode 100644
index 3c7e4a3..0000000
--- a/internal/domain/push.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package domain
-
-import (
- "crypto/hmac"
- "encoding/hex"
- "hash"
- "net/http"
-
- "source.toby3d.me/toby3d/hub/internal/common"
-)
-
-type Push struct {
- Self Topic
- ContentType string
- Content []byte
-}
-
-func (p Push) SetXHubSignatureHeader(req *http.Request, alg Algorithm, secret Secret) {
- if alg == AlgorithmUnd || secret.secret == "" {
- return
- }
-
- h := func() hash.Hash { return alg.Hash() }
- req.Header.Set(common.HeaderXHubSignature, alg.algorithm+"="+hex.EncodeToString(hmac.New(h,
- []byte(secret.secret)).Sum(p.Content)))
-}
diff --git a/internal/domain/subscription.go b/internal/domain/subscription.go
index d4a708d..e1d02fc 100644
--- a/internal/domain/subscription.go
+++ b/internal/domain/subscription.go
@@ -1,46 +1,78 @@
package domain
import (
- "math/rand"
"net/url"
+ "strconv"
"testing"
+ "time"
+
+ "source.toby3d.me/toby3d/hub/internal/common"
)
// Subscription is a unique relation to a topic by a subscriber that indicates
// it should receive updates for that topic.
type Subscription struct {
- Topic Topic
- Callback Callback
- Secret Secret
- LeaseSeconds LeaseSeconds
-}
+ // First creation datetime
+ CreatedAt time.Time
-func (s Subscription) SUID() SUID {
- return NewSSID(s.Topic, s.Callback)
+ // Last updating datetime
+ UpdatedAt time.Time
+
+ // Datetime when subscription must be deleted
+ ExpiredAt time.Time
+
+ // Datetime synced with topic updating time
+ SyncedAt time.Time
+
+ Callback *url.URL
+ Topic *url.URL
+
+ Secret Secret
}
func (s Subscription) AddQuery(q url.Values) {
- for _, w := range []QueryAdder{s.Callback, s.Topic, s.LeaseSeconds, s.Secret} {
- w.AddQuery(q)
+ s.Secret.AddQuery(q)
+ q.Add(common.HubTopic, s.Topic.String())
+ q.Add(common.HubCallback, s.Callback.String())
+ q.Add(common.HubLeaseSeconds, strconv.FormatFloat(s.LeaseSeconds(), 'g', 0, 64))
+}
+
+func (s Subscription) SUID() SUID {
+ return SUID{
+ topic: s.Topic.String(),
+ callback: s.Callback.String(),
}
}
+func (s Subscription) LeaseSeconds() float64 {
+ return s.ExpiredAt.Sub(s.UpdatedAt).Round(time.Second).Seconds()
+}
+
+func (s Subscription) Synced(t Topic) bool {
+ return s.SyncedAt.Equal(t.UpdatedAt) || s.SyncedAt.After(t.UpdatedAt)
+}
+
+func (s Subscription) Expired(ts time.Time) bool {
+ return s.ExpiredAt.Before(ts)
+}
+
func TestSubscription(tb testing.TB, callbackUrl string) *Subscription {
tb.Helper()
- callback, err := ParseCallback(callbackUrl)
+ callback, err := url.Parse(callbackUrl)
if err != nil {
tb.Fatal(err)
}
+ ts := time.Now().UTC().Round(time.Second)
+ secret := TestSecret(tb)
+
return &Subscription{
- Topic: Topic{topic: &url.URL{
- Scheme: "https",
- Host: "example.com",
- Path: "lipsum",
- }},
- Callback: *callback,
- Secret: *TestSecret(tb),
- LeaseSeconds: NewLeaseSeconds(uint(rand.Intn(60))),
+ CreatedAt: ts,
+ UpdatedAt: ts,
+ ExpiredAt: ts.Add(10 * 24 * time.Hour).Round(time.Second),
+ Callback: callback,
+ Topic: &url.URL{Scheme: "https", Host: "example.com", Path: "/lipsum"},
+ Secret: *secret,
}
}
diff --git a/internal/domain/suid.go b/internal/domain/suid.go
index 091bb44..2172e9c 100644
--- a/internal/domain/suid.go
+++ b/internal/domain/suid.go
@@ -1,29 +1,25 @@
package domain
+import "net/url"
+
// SUID describes a subscription's unique key is the tuple ([Topic] URL,
// Subscriber [Callback] URL).
type SUID struct {
- suid [2]string
+ topic string
+ callback string
}
-func NewSSID(topic Topic, callback Callback) SUID {
+func NewSSID(topic Topic, callback *url.URL) SUID {
return SUID{
- suid: [2]string{topic.topic.String(), callback.callback.String()},
+ topic: topic.Self.String(),
+ callback: callback.String(),
}
}
func (suid SUID) Equal(target SUID) bool {
- for i := range suid.suid {
- if suid.suid[i] == target.suid[i] {
- continue
- }
-
- return false
- }
-
- return true
+ return suid.topic == target.topic && suid.callback == target.callback
}
func (suid SUID) GoString() string {
- return "domain.SUID(" + suid.suid[0] + ":" + suid.suid[1] + ")"
+ return "domain.SUID(" + suid.topic + ":" + suid.callback + ")"
}
diff --git a/internal/domain/topic.go b/internal/domain/topic.go
index b69b12a..716d36c 100644
--- a/internal/domain/topic.go
+++ b/internal/domain/topic.go
@@ -1,38 +1,43 @@
package domain
import (
- "fmt"
"net/url"
+ "testing"
+ "time"
"source.toby3d.me/toby3d/hub/internal/common"
)
-// Topic is a HTTP [RFC7230] (or HTTPS [RFC2818]) resource URL. The unit to
-// which one can subscribe to changes.
-//
-// [RFC7230]: https://tools.ietf.org/html/rfc7230
-// [RFC2818]: https://tools.ietf.org/html/rfc2818
type Topic struct {
- topic *url.URL
+ CreatedAt time.Time
+ UpdatedAt time.Time
+ Self *url.URL
+ ContentType string
+ Content []byte
}
-func ParseTopic(str string) (*Topic, error) {
- u, err := url.Parse(str)
- if err != nil {
- return nil, fmt.Errorf("cannot parse string as topic URL: %w", err)
- }
+func TestTopic(tb testing.TB) *Topic {
+ tb.Helper()
- return &Topic{topic: u}, nil
+ now := time.Now().UTC().Add(-1 * time.Hour)
+
+ return &Topic{
+ CreatedAt: now,
+ UpdatedAt: now,
+ Self: &url.URL{Scheme: "https", Host: "example.com", Path: "/"},
+ ContentType: "text/html",
+ Content: []byte("hello, world"),
+ }
}
func (t Topic) AddQuery(q url.Values) {
- q.Add(common.HubTopic, t.topic.String())
+ q.Add(common.HubTopic, t.Self.String())
}
func (t Topic) Equal(target Topic) bool {
- return t.topic.String() == target.topic.String()
+ return t.Self.String() == target.Self.String()
}
func (t Topic) String() string {
- return t.topic.String()
+ return t.Self.String()
}
diff --git a/internal/hub/delivery/http/hub_http.go b/internal/hub/delivery/http/hub_http.go
index 875497a..b637e57 100644
--- a/internal/hub/delivery/http/hub_http.go
+++ b/internal/hub/delivery/http/hub_http.go
@@ -1,6 +1,7 @@
package http
import (
+ "context"
"errors"
"fmt"
"net/http"
@@ -14,27 +15,29 @@ import (
"source.toby3d.me/toby3d/hub/internal/domain"
"source.toby3d.me/toby3d/hub/internal/hub"
"source.toby3d.me/toby3d/hub/internal/subscription"
+ "source.toby3d.me/toby3d/hub/internal/topic"
"source.toby3d.me/toby3d/hub/web/template"
)
type (
Request struct {
- Callback domain.Callback
- Topic domain.Topic
+ Callback *url.URL
+ Topic *url.URL
Secret domain.Secret
Mode domain.Mode
- LeaseSeconds domain.LeaseSeconds
+ LeaseSeconds float64
}
Response struct {
Mode domain.Mode
- Topic domain.Topic
Reason string
+ Topic domain.Topic
}
NewHandlerParams struct {
Hub hub.UseCase
Subscriptions subscription.UseCase
+ Topics topic.UseCase
Matcher language.Matcher
Name string
}
@@ -42,12 +45,13 @@ type (
Handler struct {
hub hub.UseCase
subscriptions subscription.UseCase
+ topics topic.UseCase
matcher language.Matcher
name string
}
)
-var DefaultRequestLeaseSeconds = domain.NewLeaseSeconds(uint(time.Duration(time.Hour * 24 * 10).Seconds())) // 10 days
+var DefaultRequestLeaseSeconds = time.Duration(10 * 24 * time.Hour).Seconds() // 10 days
var (
ErrHubMode = errors.New(common.HubMode + " MUST be " + domain.ModeSubscribe.String() + " or " +
@@ -61,74 +65,71 @@ func NewHandler(params NewHandlerParams) *Handler {
matcher: params.Matcher,
name: params.Name,
subscriptions: params.Subscriptions,
+ topics: params.Topics,
}
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ now := time.Now().UTC().Round(time.Second)
+
switch r.Method {
default:
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
case http.MethodPost:
req := NewRequest()
- if err := req.bind(r); err != nil {
+
+ var err error
+ if err = req.bind(r); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
+ // TODO(toby3d): send denied ping to callback if it's not accepted by hub
+
s := new(domain.Subscription)
- req.populate(s)
+ req.populate(s, now)
switch req.Mode {
- case domain.ModeSubscribe:
- h.hub.Subscribe(r.Context(), *s)
- case domain.ModeUnsubscribe:
- h.hub.Unsubscribe(r.Context(), *s)
+ case domain.ModeSubscribe, domain.ModeUnsubscribe:
+ if _, err = h.hub.Verify(r.Context(), *s, req.Mode); err != nil {
+ r.Clone(context.WithValue(r.Context(), "error", err))
+
+ w.WriteHeader(http.StatusAccepted)
+
+ return
+ }
+
+ switch req.Mode {
+ case domain.ModeSubscribe:
+ _, err = h.subscriptions.Subscribe(r.Context(), *s)
+ case domain.ModeUnsubscribe:
+ _, err = h.subscriptions.Unsubscribe(r.Context(), *s)
+ }
case domain.ModePublish:
- go h.hub.Publish(r.Context(), req.Topic)
+ _, err = h.topics.Publish(r.Context(), req.Topic)
+ }
+
+ if err != nil {
+ r.Clone(context.WithValue(r.Context(), "error", err))
}
w.WriteHeader(http.StatusAccepted)
case "", http.MethodGet:
tags, _, _ := language.ParseAcceptLanguage(r.Header.Get(common.HeaderAcceptLanguage))
tag, _, _ := h.matcher.Match(tags...)
- baseOf := template.NewBaseOf(tag, h.name)
-
- var page template.Page
- if r.URL.Query().Has(common.HubTopic) {
- topic, err := domain.ParseTopic(r.URL.Query().Get(common.HubTopic))
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
-
- return
- }
-
- subscriptions, err := h.subscriptions.Fetch(r.Context(), *topic)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
-
- return
- }
-
- page = &template.Topic{
- BaseOf: baseOf,
- Subscribers: len(subscriptions),
- }
- } else {
- page = &template.Home{BaseOf: baseOf}
- }
w.Header().Set(common.HeaderContentType, common.MIMETextHTMLCharsetUTF8)
- template.WriteTemplate(w, page)
+ template.WriteTemplate(w, &template.Home{BaseOf: template.NewBaseOf(tag, h.name)})
}
}
func NewRequest() *Request {
return &Request{
Mode: domain.ModeUnd,
- Callback: domain.Callback{},
+ Callback: nil,
Secret: domain.Secret{},
- Topic: domain.Topic{},
+ Topic: nil,
LeaseSeconds: DefaultRequestLeaseSeconds,
}
}
@@ -148,54 +149,33 @@ func (r *Request) bind(req *http.Request) error {
return fmt.Errorf("cannot parse %s: %w", common.HubMode, err)
}
+ // NOTE(toby3d): hub.topic
+ if !req.PostForm.Has(common.HubTopic) {
+ return fmt.Errorf("%s parameter is required, but not provided", common.HubTopic)
+ }
+
+ if r.Topic, err = url.Parse(req.PostForm.Get(common.HubTopic)); err != nil {
+ return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err)
+ }
+
switch r.Mode {
case domain.ModePublish:
- if !req.PostForm.Has(common.HubURL) {
- return fmt.Errorf("%s parameter for %s %s is required, but not provided", common.HubURL,
- r.Mode, common.HubMode)
- }
-
- topic, err := domain.ParseTopic(req.PostForm.Get(common.HubURL))
- if err != nil {
- return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err)
- }
-
- r.Topic = *topic
case domain.ModeSubscribe, domain.ModeUnsubscribe:
- for _, k := range []string{common.HubTopic, common.HubCallback} {
- if req.PostForm.Has(k) {
- continue
- }
-
- return fmt.Errorf("%s parameter is required, but not provided", k)
- }
-
- // NOTE(toby3d): hub.topic
- topic, err := domain.ParseTopic(req.PostForm.Get(common.HubTopic))
- if err != nil {
- return fmt.Errorf("cannot parse %s: %w", common.HubTopic, err)
- }
-
- r.Topic = *topic
-
// NOTE(toby3d): hub.callback
- callback, err := domain.ParseCallback(req.PostForm.Get(common.HubCallback))
- if err != nil {
+ if !req.PostForm.Has(common.HubCallback) {
+ return fmt.Errorf("%s parameter is required, but not provided", common.HubCallback)
+ }
+
+ if r.Callback, err = url.Parse(req.PostForm.Get(common.HubCallback)); err != nil {
return fmt.Errorf("cannot parse %s: %w", common.HubCallback, err)
}
- r.Callback = *callback
-
// NOTE(toby3d): hub.lease_seconds
if r.Mode != domain.ModeUnsubscribe && req.PostForm.Has(common.HubLeaseSeconds) {
- var ls uint64
- if ls, err = strconv.ParseUint(req.PostForm.Get(common.HubLeaseSeconds), 10, 64); err != nil {
+ r.LeaseSeconds, err = strconv.ParseFloat(req.PostForm.Get(common.HubLeaseSeconds), 64)
+ if err != nil {
return fmt.Errorf("cannot parse %s: %w", common.HubLeaseSeconds, err)
}
-
- if ls != 0 {
- r.LeaseSeconds = domain.NewLeaseSeconds(uint(ls))
- }
}
// NOTE(toby3d): hub.secret
@@ -218,11 +198,13 @@ func (r *Request) bind(req *http.Request) error {
return nil
}
-func (r Request) populate(s *domain.Subscription) {
+func (r Request) populate(s *domain.Subscription, ts time.Time) {
+ s.CreatedAt = ts
+ s.UpdatedAt = ts
+ s.ExpiredAt = ts.Add(time.Duration(r.LeaseSeconds) * time.Second).Round(time.Second)
s.Callback = r.Callback
- s.LeaseSeconds = r.LeaseSeconds
- s.Secret = r.Secret
s.Topic = r.Topic
+ s.Secret = r.Secret
}
func NewResponse(t domain.Topic, err error) *Response {
diff --git a/internal/hub/delivery/http/hub_http_test.go b/internal/hub/delivery/http/hub_http_test.go
index f4b2149..cfdd00d 100644
--- a/internal/hub/delivery/http/hub_http_test.go
+++ b/internal/hub/delivery/http/hub_http_test.go
@@ -2,7 +2,6 @@ package http_test
import (
"context"
- "errors"
"fmt"
"net/http"
"net/http/httptest"
@@ -10,15 +9,16 @@ import (
"strings"
"testing"
- "github.com/google/go-cmp/cmp"
"golang.org/x/text/language"
"source.toby3d.me/toby3d/hub/internal/common"
"source.toby3d.me/toby3d/hub/internal/domain"
delivery "source.toby3d.me/toby3d/hub/internal/hub/delivery/http"
- ucase "source.toby3d.me/toby3d/hub/internal/hub/usecase"
- "source.toby3d.me/toby3d/hub/internal/subscription"
+ hubucase "source.toby3d.me/toby3d/hub/internal/hub/usecase"
subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory"
+ subscriptionucase "source.toby3d.me/toby3d/hub/internal/subscription/usecase"
+ topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory"
+ topicucase "source.toby3d.me/toby3d/hub/internal/topic/usecase"
)
func TestHandler_ServeHTTP_Subscribe(t *testing.T) {
@@ -31,22 +31,27 @@ func TestHandler_ServeHTTP_Subscribe(t *testing.T) {
in := domain.TestSubscription(t, srv.URL+"/lipsum")
subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
- hub := ucase.NewHubUseCase(subscriptions, srv.Client(), &url.URL{Scheme: "https", Host: "hub.exmaple.com"})
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ hub := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{
+ Scheme: "https",
+ Host: "hub.exmaple.com",
+ Path: "/",
+ })
payload := make(url.Values)
domain.ModeSubscribe.AddQuery(payload)
in.AddQuery(payload)
- req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/",
- strings.NewReader(payload.Encode()))
+ req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", strings.NewReader(payload.Encode()))
req.Header.Set(common.HeaderContentType, common.MIMEApplicationFormCharsetUTF8)
w := httptest.NewRecorder()
delivery.NewHandler(delivery.NewHandlerParams{
Hub: hub,
- Subscriptions: subscriptions,
+ Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions, topics),
+ Topics: topicucase.NewTopicUseCase(topics, srv.Client()),
Matcher: language.NewMatcher([]language.Tag{language.English}),
- Name: "hub",
+ Name: "WebSub",
}).ServeHTTP(w, req)
resp := w.Result()
@@ -54,49 +59,45 @@ func TestHandler_ServeHTTP_Subscribe(t *testing.T) {
if expect := http.StatusAccepted; resp.StatusCode != expect {
t.Errorf("%s %s = %d, want %d", req.Method, req.RequestURI, resp.StatusCode, expect)
}
-
- out, err := subscriptions.Get(context.Background(), in.SUID())
- if err != nil {
- t.Fatal(err)
- }
-
- if diff := cmp.Diff(out, in, cmp.AllowUnexported(domain.Secret{}, domain.Callback{}, domain.Topic{},
- domain.LeaseSeconds{})); diff != "" {
- t.Error(diff)
- }
}
func TestHandler_ServeHTTP_Unsubscribe(t *testing.T) {
t.Parallel()
srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set(common.HeaderContentType, common.MIMETextPlainCharsetUTF8)
fmt.Fprint(w, r.URL.Query().Get(common.HubChallenge))
}))
t.Cleanup(srv.Close)
in := domain.TestSubscription(t, srv.URL+"/lipsum")
subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
if err := subscriptions.Create(context.Background(), in.SUID(), *in); err != nil {
t.Fatal(err)
}
- hub := ucase.NewHubUseCase(subscriptions, srv.Client(), &url.URL{Scheme: "https", Host: "hub.exmaple.com"})
+ hub := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{
+ Scheme: "https",
+ Host: "hub.exmaple.com",
+ Path: "/",
+ })
payload := make(url.Values)
domain.ModeUnsubscribe.AddQuery(payload)
in.AddQuery(payload)
- req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/",
- strings.NewReader(payload.Encode()))
+ req := httptest.NewRequest(http.MethodPost, "https://hub.example.com/", strings.NewReader(payload.Encode()))
req.Header.Set(common.HeaderContentType, common.MIMEApplicationFormCharsetUTF8)
w := httptest.NewRecorder()
delivery.NewHandler(delivery.NewHandlerParams{
Hub: hub,
- Subscriptions: subscriptions,
+ Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions, topics),
+ Topics: topicucase.NewTopicUseCase(topics, srv.Client()),
Matcher: language.NewMatcher([]language.Tag{language.English}),
- Name: "hub",
+ Name: "WebSub",
}).ServeHTTP(w, req)
resp := w.Result()
@@ -104,8 +105,4 @@ func TestHandler_ServeHTTP_Unsubscribe(t *testing.T) {
if expect := http.StatusAccepted; resp.StatusCode != expect {
t.Errorf("%s %s = %d, want %d", req.Method, req.RequestURI, resp.StatusCode, expect)
}
-
- if _, err := subscriptions.Get(context.Background(), in.SUID()); !errors.Is(err, subscription.ErrNotExist) {
- t.Errorf("want %s, got %s", subscription.ErrNotExist, err)
- }
}
diff --git a/internal/hub/usecase.go b/internal/hub/usecase.go
index 4dcb873..a3f0d6a 100644
--- a/internal/hub/usecase.go
+++ b/internal/hub/usecase.go
@@ -8,9 +8,8 @@ import (
)
type UseCase interface {
- Subscribe(ctx context.Context, subscription domain.Subscription) (bool, error)
- Unsubscribe(ctx context.Context, subscription domain.Subscription) (bool, error)
- Publish(ctx context.Context, t domain.Topic) error
+ Verify(ctx context.Context, subscription domain.Subscription, mode domain.Mode) (bool, error)
+ ListenAndServe(ctx context.Context) error
}
var (
diff --git a/internal/hub/usecase/hub_ucase.go b/internal/hub/usecase/hub_ucase.go
index 5e4eef3..f5cbb4a 100644
--- a/internal/hub/usecase/hub_ucase.go
+++ b/internal/hub/usecase/hub_ucase.go
@@ -3,21 +3,26 @@ package usecase
import (
"bytes"
"context"
- "errors"
+ "crypto/hmac"
+ "encoding/hex"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
+ "strconv"
+ "time"
"source.toby3d.me/toby3d/hub/internal/common"
"source.toby3d.me/toby3d/hub/internal/domain"
"source.toby3d.me/toby3d/hub/internal/hub"
"source.toby3d.me/toby3d/hub/internal/subscription"
+ "source.toby3d.me/toby3d/hub/internal/topic"
)
type hubUseCase struct {
subscriptions subscription.Repository
+ topics topic.Repository
client *http.Client
self *url.URL
}
@@ -27,11 +32,12 @@ const (
lengthMax = 32
)
-func NewHubUseCase(subscriptions subscription.Repository, client *http.Client, self *url.URL) hub.UseCase {
+func NewHubUseCase(t topic.Repository, s subscription.Repository, c *http.Client, u *url.URL) hub.UseCase {
return &hubUseCase{
- subscriptions: subscriptions,
- client: client,
- self: self,
+ client: c,
+ self: u,
+ topics: t,
+ subscriptions: s,
}
}
@@ -41,15 +47,15 @@ func (ucase *hubUseCase) Verify(ctx context.Context, s domain.Subscription, mode
return false, fmt.Errorf("cannot generate hub.challenge: %w", err)
}
- u := s.Callback.URL()
+ u, _ := url.Parse(s.Callback.String())
q := u.Query()
- for _, w := range []domain.QueryAdder{mode, s.Topic, challenge} {
- w.AddQuery(q)
- }
+ mode.AddQuery(q)
+ q.Add(common.HubTopic, s.Topic.String())
+ challenge.AddQuery(q)
if mode == domain.ModeSubscribe {
- s.LeaseSeconds.AddQuery(q)
+ q.Add(common.HubLeaseSeconds, strconv.FormatFloat(s.LeaseSeconds(), 'g', 0, 64))
}
u.RawQuery = q.Encode()
@@ -77,118 +83,104 @@ func (ucase *hubUseCase) Verify(ctx context.Context, s domain.Subscription, mode
return false, fmt.Errorf("cannot verify subscriber response body: %w", err)
}
- if !challenge.Equal(body) {
+ if !challenge.Equal(string(body)) {
return false, fmt.Errorf("%w: got '%s', want '%s'", hub.ErrChallenge, body, *challenge)
}
return true, nil
}
-func (ucase *hubUseCase) Subscribe(ctx context.Context, s domain.Subscription) (bool, error) {
- var err error
- if _, err = ucase.Verify(ctx, s, domain.ModeSubscribe); err != nil {
- return false, fmt.Errorf("cannot validate subscription request: %w", err)
- }
+func (ucase *hubUseCase) ListenAndServe(ctx context.Context) error {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
- suid := s.SUID()
+ for ts := range ticker.C {
+ ts = ts.Round(time.Second)
- if _, err = ucase.subscriptions.Get(ctx, suid); err != nil {
- if !errors.Is(err, subscription.ErrNotExist) {
- return false, fmt.Errorf("cannot check exists subscriptions: %w", err)
+ topics, err := ucase.topics.Fetch(ctx)
+ if err != nil {
+ return fmt.Errorf("cannot fetch topics: %w", err)
}
- if err = ucase.subscriptions.Create(ctx, suid, s); err != nil {
- return false, fmt.Errorf("cannot create a new subscription: %w", err)
+ for i := range topics {
+ subscriptions, err := ucase.subscriptions.Fetch(ctx, &topics[i])
+ if err != nil {
+ return fmt.Errorf("cannot fetch subscriptions: %w", err)
+ }
+
+ for j := range subscriptions {
+ if subscriptions[j].Expired(ts) {
+ if err = ucase.subscriptions.Delete(ctx, subscriptions[j].SUID()); err != nil {
+ return fmt.Errorf("cannot remove expired subcription: %w", err)
+ }
+
+ continue
+ }
+
+ if subscriptions[j].Synced(topics[i]) {
+ continue
+ }
+
+ go ucase.push(ctx, subscriptions[j], topics[i], ts)
+ }
}
-
- return true, nil
- }
-
- if err = ucase.subscriptions.Update(ctx, suid, func(buf *domain.Subscription) (*domain.Subscription, error) {
- buf.LeaseSeconds = s.LeaseSeconds
- buf.Secret = s.Secret
-
- return buf, nil
- }); err != nil {
- return false, fmt.Errorf("cannot update subscription: %w", err)
- }
-
- return false, nil
-}
-
-func (ucase *hubUseCase) Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error) {
- var err error
- if _, err = ucase.Verify(ctx, s, domain.ModeUnsubscribe); err != nil {
- return false, fmt.Errorf("cannot validate unsubscription request: %w", err)
- }
-
- if err = ucase.subscriptions.Delete(ctx, s.SUID()); err != nil {
- return false, fmt.Errorf("cannot remove subscription: %w", err)
- }
-
- return true, nil
-}
-
-func (ucase *hubUseCase) Publish(ctx context.Context, t domain.Topic) error {
- resp, err := ucase.client.Get(t.String())
- if err != nil {
- return fmt.Errorf("cannot fetch topic payload for publishing: %w", err)
- }
-
- push := domain.Push{ContentType: resp.Header.Get(common.HeaderContentType)}
-
- canonicalTopic, err := domain.ParseTopic(resp.Request.URL.String())
- if err != nil {
- return fmt.Errorf("cannot parse canonical topic URL: %w", err)
- }
-
- push.Self = *canonicalTopic
-
- if push.Content, err = io.ReadAll(resp.Body); err != nil {
- return fmt.Errorf("cannot read topic body: %w", err)
- }
-
- subscriptions, err := ucase.subscriptions.Fetch(ctx, t)
- if err != nil {
- return fmt.Errorf("cannot fetch subscriptions for topic: %w", err)
- }
-
- for i := range subscriptions {
- ucase.Push(ctx, push, subscriptions[i])
}
return nil
}
-func (ucase *hubUseCase) Push(ctx context.Context, p domain.Push, s domain.Subscription) (bool, error) {
- req, err := http.NewRequest(http.MethodPost, s.Callback.String(), bytes.NewReader(p.Content))
+func (ucase *hubUseCase) push(ctx context.Context, s domain.Subscription, t domain.Topic, ts time.Time) (bool, error) {
+ req, err := http.NewRequest(http.MethodPost, s.Callback.String(), bytes.NewReader(t.Content))
if err != nil {
- return false, fmt.Errorf("cannot build push request: %w", err)
+ return false, fmt.Errorf("cannot build request: %w", err)
}
- req.Header.Set(common.HeaderContentType, p.ContentType)
- req.Header.Set(common.HeaderLink, `<`+ucase.self.String()+`>; rel="hub", <`+p.Self.String()+`>; rel="self"`)
- p.SetXHubSignatureHeader(req, domain.AlgorithmSHA512, s.Secret)
+ req.Header.Set(common.HeaderContentType, t.ContentType)
+ req.Header.Set(common.HeaderLink, `<`+ucase.self.String()+`>; rel="hub", <`+s.Topic.String()+`>; rel="self"`)
+ setXHubSignatureHeader(req, domain.AlgorithmSHA512, s.Secret, t.Content)
resp, err := ucase.client.Do(req)
if err != nil {
return false, fmt.Errorf("cannot push: %w", err)
}
- // The subscriber's callback URL MAY return an HTTP 410 code to indicate that the subscription has been
- // deleted, and the hub MAY terminate the subscription if it receives that code as a response.
+ suid := s.SUID()
+
+ // The subscriber's callback URL MAY return an HTTP 410 code to indicate
+ // that the subscription has been deleted, and the hub MAY terminate the
+ // subscription if it receives that code as a response.
if resp.StatusCode == http.StatusGone {
- if err = ucase.subscriptions.Delete(ctx, s.SUID()); err != nil {
+ if err = ucase.subscriptions.Delete(ctx, suid); err != nil {
return false, fmt.Errorf("cannot remove deleted subscription: %w", err)
}
return true, nil
}
- // The subscriber's callback URL MUST return an HTTP 2xx response code to indicate a success.
+ // The subscriber's callback URL MUST return an HTTP 2xx response code
+ // to indicate a success.
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
return false, hub.ErrStatus
}
+ if err = ucase.subscriptions.Update(ctx, suid, func(tx *domain.Subscription) (*domain.Subscription, error) {
+ tx.SyncedAt = t.UpdatedAt
+
+ return tx, nil
+ }); err != nil {
+ return false, fmt.Errorf("cannot sync sybsciption status: %w", err)
+ }
+
return true, nil
}
+
+func setXHubSignatureHeader(req *http.Request, alg domain.Algorithm, secret domain.Secret, body []byte) {
+ if !secret.IsSet() || alg == domain.AlgorithmUnd {
+ return
+ }
+
+ h := hmac.New(alg.Hash, []byte(secret.String()))
+ h.Write(body)
+
+ req.Header.Set(common.HeaderXHubSignature, alg.String()+"="+hex.EncodeToString(h.Sum(nil)))
+}
diff --git a/internal/hub/usecase/hub_ucase_test.go b/internal/hub/usecase/hub_ucase_test.go
new file mode 100644
index 0000000..eefd117
--- /dev/null
+++ b/internal/hub/usecase/hub_ucase_test.go
@@ -0,0 +1,43 @@
+package usecase_test
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "testing"
+
+ "source.toby3d.me/toby3d/hub/internal/common"
+ "source.toby3d.me/toby3d/hub/internal/domain"
+ hubucase "source.toby3d.me/toby3d/hub/internal/hub/usecase"
+ subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory"
+ topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory"
+)
+
+func TestHubUseCase_Verify(t *testing.T) {
+ t.Parallel()
+
+ srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set(common.HeaderContentType, common.MIMETextPlainCharsetUTF8)
+ fmt.Fprint(w, r.FormValue(common.HubChallenge))
+ }))
+ t.Cleanup(srv.Close)
+
+ subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ subscription := domain.TestSubscription(t, srv.URL)
+
+ ok, err := hubucase.NewHubUseCase(topics, subscriptions, srv.Client(), &url.URL{
+ Scheme: "https",
+ Host: "hub.example.com",
+ Path: "/",
+ }).Verify(context.Background(), *subscription, domain.ModeSubscribe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !ok {
+ t.Errorf("want %t, got %t", true, ok)
+ }
+}
diff --git a/internal/middleware/logfmt.go b/internal/middleware/logfmt.go
index 3d95444..bcedabb 100644
--- a/internal/middleware/logfmt.go
+++ b/internal/middleware/logfmt.go
@@ -68,6 +68,7 @@ func LogFmtWithConfig(config LogFmtConfig) Interceptor {
next(rw, r)
+ rw.error, _ = r.Context().Value("error").(error)
end := time.Now().UTC()
encoder.EncodeKeyvals(
diff --git a/internal/subscription/repository.go b/internal/subscription/repository.go
index 83b28bf..f5ecbe0 100644
--- a/internal/subscription/repository.go
+++ b/internal/subscription/repository.go
@@ -13,7 +13,7 @@ type (
Repository interface {
Create(ctx context.Context, suid domain.SUID, subscription domain.Subscription) error
Get(ctx context.Context, suid domain.SUID) (*domain.Subscription, error)
- Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error)
+ Fetch(ctx context.Context, topic *domain.Topic) ([]domain.Subscription, error)
Update(ctx context.Context, suid domain.SUID, update UpdateFunc) error
Delete(ctx context.Context, suid domain.SUID) error
}
diff --git a/internal/subscription/repository/memory/memory_subscription.go b/internal/subscription/repository/memory/memory_subscription.go
index 5e00cc6..273353e 100644
--- a/internal/subscription/repository/memory/memory_subscription.go
+++ b/internal/subscription/repository/memory/memory_subscription.go
@@ -15,10 +15,15 @@ type memorySubscriptionRepository struct {
subscriptions map[domain.SUID]domain.Subscription
}
-// Create implements subscription.Repository
+func NewMemorySubscriptionRepository() subscription.Repository {
+ return &memorySubscriptionRepository{
+ mutex: new(sync.RWMutex),
+ subscriptions: make(map[domain.SUID]domain.Subscription),
+ }
+}
+
func (repo *memorySubscriptionRepository) Create(ctx context.Context, suid domain.SUID, s domain.Subscription) error {
- _, err := repo.Get(ctx, suid)
- if err != nil {
+ if _, err := repo.Get(ctx, suid); err != nil {
if !errors.Is(err, subscription.ErrNotExist) {
return fmt.Errorf("cannot create subscription: %w", err)
}
@@ -34,7 +39,6 @@ func (repo *memorySubscriptionRepository) Create(ctx context.Context, suid domai
return nil
}
-// Delete implements subscription.Repository
func (repo *memorySubscriptionRepository) Delete(ctx context.Context, suid domain.SUID) error {
if _, err := repo.Get(ctx, suid); err != nil {
if !errors.Is(err, subscription.ErrNotExist) {
@@ -52,7 +56,6 @@ func (repo *memorySubscriptionRepository) Delete(ctx context.Context, suid domai
return nil
}
-// Get implements subscription.Repository
func (repo *memorySubscriptionRepository) Get(_ context.Context, suid domain.SUID) (*domain.Subscription, error) {
repo.mutex.RLock()
defer repo.mutex.RUnlock()
@@ -64,14 +67,14 @@ func (repo *memorySubscriptionRepository) Get(_ context.Context, suid domain.SUI
return nil, subscription.ErrNotExist
}
-func (repo *memorySubscriptionRepository) Fetch(ctx context.Context, t domain.Topic) ([]domain.Subscription, error) {
+func (repo *memorySubscriptionRepository) Fetch(ctx context.Context, t *domain.Topic) ([]domain.Subscription, error) {
repo.mutex.RLock()
defer repo.mutex.RUnlock()
out := make([]domain.Subscription, 0)
for _, s := range repo.subscriptions {
- if !s.Topic.Equal(t) {
+ if t != nil && t.Self.String() != s.Topic.String() {
continue
}
@@ -100,10 +103,3 @@ func (repo *memorySubscriptionRepository) Update(ctx context.Context, suid domai
return nil
}
-
-func NewMemorySubscriptionRepository() subscription.Repository {
- return &memorySubscriptionRepository{
- mutex: new(sync.RWMutex),
- subscriptions: make(map[domain.SUID]domain.Subscription),
- }
-}
diff --git a/internal/subscription/usecase.go b/internal/subscription/usecase.go
index 6910665..725ca9f 100644
--- a/internal/subscription/usecase.go
+++ b/internal/subscription/usecase.go
@@ -7,5 +7,6 @@ import (
)
type UseCase interface {
- Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error)
+ Subscribe(ctx context.Context, s domain.Subscription) (bool, error)
+ Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error)
}
diff --git a/internal/subscription/usecase/subscription_ucase.go b/internal/subscription/usecase/subscription_ucase.go
index f4f49e1..727ca10 100644
--- a/internal/subscription/usecase/subscription_ucase.go
+++ b/internal/subscription/usecase/subscription_ucase.go
@@ -2,27 +2,94 @@ package usecase
import (
"context"
+ "errors"
"fmt"
+ "io"
+ "net/http"
+ "time"
+ "source.toby3d.me/toby3d/hub/internal/common"
"source.toby3d.me/toby3d/hub/internal/domain"
"source.toby3d.me/toby3d/hub/internal/subscription"
+ "source.toby3d.me/toby3d/hub/internal/topic"
)
type subscriptionUseCase struct {
+ topics topic.Repository
subscriptions subscription.Repository
+ client *http.Client
}
-func NewSubscriptionUseCase(subscriptions subscription.Repository) subscription.UseCase {
+func NewSubscriptionUseCase(subs subscription.Repository, tops topic.Repository, c *http.Client) subscription.UseCase {
return &subscriptionUseCase{
- subscriptions: subscriptions,
+ subscriptions: subs,
+ topics: tops,
+ client: c,
}
}
-func (ucase *subscriptionUseCase) Fetch(ctx context.Context, topic domain.Topic) ([]domain.Subscription, error) {
- out, err := ucase.subscriptions.Fetch(ctx, topic)
- if err != nil {
- return nil, fmt.Errorf("cannot fetch subscriptions for topic: %w", err)
+func (ucase *subscriptionUseCase) Subscribe(ctx context.Context, s domain.Subscription) (bool, error) {
+ now := time.Now().UTC().Round(time.Second)
+
+ if _, err := ucase.topics.Get(context.Background(), s.Topic); err != nil {
+ if !errors.Is(err, topic.ErrNotExist) {
+ return false, fmt.Errorf("cannot check subscription topic: %w", err)
+ }
+
+ resp, err := ucase.client.Get(s.Topic.String())
+ if err != nil {
+ return false, fmt.Errorf("cannot fetch a new topic subscription content: %w", err)
+ }
+
+ content, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return false, fmt.Errorf("cannot read a new topic subscription content: %w", err)
+ }
+
+ if err = ucase.topics.Create(ctx, s.Topic, domain.Topic{
+ CreatedAt: now,
+ UpdatedAt: now,
+ Self: s.Topic,
+ ContentType: resp.Header.Get(common.HeaderContentType),
+ Content: content,
+ }); err != nil {
+ return false, fmt.Errorf("cannot create topic for subsciption: %w", err)
+ }
}
- return out, nil
+ if err := ucase.subscriptions.Create(ctx, s.SUID(), domain.Subscription{
+ CreatedAt: now,
+ UpdatedAt: now,
+ SyncedAt: now,
+ ExpiredAt: s.ExpiredAt,
+ Callback: s.Callback,
+ Topic: s.Topic,
+ Secret: s.Secret,
+ }); err != nil {
+ if !errors.Is(err, subscription.ErrExist) {
+ return false, fmt.Errorf("cannot create a new subscription: %w", err)
+ }
+
+ if err = ucase.subscriptions.Update(ctx, s.SUID(), func(tx *domain.Subscription) (*domain.Subscription,
+ error,
+ ) {
+ tx.UpdatedAt = now
+ tx.ExpiredAt = now.Add(time.Duration(s.LeaseSeconds()) * time.Second)
+ tx.Secret = s.Secret
+
+ return tx, nil
+ }); err != nil {
+ return false, fmt.Errorf("cannot resubscribe existing subscription: %w", err)
+ }
+ }
+
+ return true, nil
+}
+
+func (ucase *subscriptionUseCase) Unsubscribe(ctx context.Context, s domain.Subscription) (bool, error) {
+ if err := ucase.subscriptions.Delete(ctx, s.SUID()); err != nil {
+ return false, fmt.Errorf("cannot unsubscribe: %w", err)
+ }
+
+ return true, nil
}
diff --git a/internal/subscription/usecase/subscription_ucase_test.go b/internal/subscription/usecase/subscription_ucase_test.go
new file mode 100644
index 0000000..8e7c42e
--- /dev/null
+++ b/internal/subscription/usecase/subscription_ucase_test.go
@@ -0,0 +1,73 @@
+package usecase_test
+
+import (
+ "context"
+ "testing"
+
+ "source.toby3d.me/toby3d/hub/internal/domain"
+ subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory"
+ "source.toby3d.me/toby3d/hub/internal/subscription/usecase"
+ topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory"
+)
+
+func TestSubscriptionUseCase_Subscribe(t *testing.T) {
+ t.Parallel()
+
+ subscription := domain.TestSubscription(t, "https://example.com/")
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
+
+ ucase := usecase.NewSubscriptionUseCase(subscriptions, topics)
+
+ ok, err := ucase.Subscribe(context.Background(), *subscription)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !ok {
+ t.Errorf("want %t, got %t", true, ok)
+ }
+
+ if _, err := subscriptions.Get(context.Background(), subscription.SUID()); err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("resubscribe", func(t *testing.T) {
+ t.Parallel()
+
+ ok, err := ucase.Subscribe(context.Background(), *subscription)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !ok {
+ t.Errorf("want %t, got %t", true, ok)
+ }
+ })
+}
+
+func TestSubscriptionUseCase_Unsubscribe(t *testing.T) {
+ t.Parallel()
+
+ subscription := domain.TestSubscription(t, "https://example.com/")
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
+
+ if err := subscriptions.Create(context.Background(), subscription.SUID(), *subscription); err != nil {
+ t.Fatal(err)
+ }
+
+ ok, err := usecase.NewSubscriptionUseCase(subscriptions, topics).
+ Unsubscribe(context.Background(), *subscription)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !ok {
+ t.Errorf("want %t, got %t", true, ok)
+ }
+
+ if _, err := subscriptions.Get(context.Background(), subscription.SUID()); err == nil {
+ t.Error("want error, got nil")
+ }
+}
diff --git a/internal/topic/repository.go b/internal/topic/repository.go
new file mode 100644
index 0000000..e8371d4
--- /dev/null
+++ b/internal/topic/repository.go
@@ -0,0 +1,25 @@
+package topic
+
+import (
+ "context"
+ "errors"
+ "net/url"
+
+ "source.toby3d.me/toby3d/hub/internal/domain"
+)
+
+type (
+ UpdateFunc func(t *domain.Topic) (*domain.Topic, error)
+
+ Repository interface {
+ Create(ctx context.Context, u *url.URL, topic domain.Topic) error
+ Update(ctx context.Context, u *url.URL, update UpdateFunc) error
+ Fetch(ctx context.Context) ([]domain.Topic, error)
+ Get(ctx context.Context, u *url.URL) (*domain.Topic, error)
+ }
+)
+
+var (
+ ErrExist = errors.New("topic already exists")
+ ErrNotExist = errors.New("topic does not exist")
+)
diff --git a/internal/topic/repository/memory/memory_topic.go b/internal/topic/repository/memory/memory_topic.go
new file mode 100644
index 0000000..94eeec5
--- /dev/null
+++ b/internal/topic/repository/memory/memory_topic.go
@@ -0,0 +1,85 @@
+package memory
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/url"
+ "sync"
+
+ "source.toby3d.me/toby3d/hub/internal/domain"
+ "source.toby3d.me/toby3d/hub/internal/topic"
+)
+
+type memoryTopicRepository struct {
+ mutex *sync.RWMutex
+ topics map[string]domain.Topic
+}
+
+func NewMemoryTopicRepository() topic.Repository {
+ return &memoryTopicRepository{
+ mutex: new(sync.RWMutex),
+ topics: make(map[string]domain.Topic),
+ }
+}
+
+func (repo *memoryTopicRepository) Update(ctx context.Context, u *url.URL, update topic.UpdateFunc) error {
+ tx, err := repo.Get(ctx, u)
+ if err != nil {
+ return fmt.Errorf("cannot find updating topic: %w", err)
+ }
+
+ repo.mutex.Lock()
+ defer repo.mutex.Unlock()
+
+ result, err := update(tx)
+ if err != nil {
+ return fmt.Errorf("cannot update topic: %w", err)
+ }
+
+ repo.topics[u.String()] = *result
+
+ return nil
+}
+
+func (repo *memoryTopicRepository) Create(ctx context.Context, u *url.URL, t domain.Topic) error {
+ _, err := repo.Get(ctx, u)
+ if err != nil && !errors.Is(err, topic.ErrNotExist) {
+ return fmt.Errorf("cannot get topic: %w", err)
+ }
+
+ if err == nil {
+ return topic.ErrExist
+ }
+
+ repo.mutex.Lock()
+ defer repo.mutex.Unlock()
+
+ repo.topics[u.String()] = t
+
+ return nil
+}
+
+func (repo *memoryTopicRepository) Get(ctx context.Context, u *url.URL) (*domain.Topic, error) {
+ repo.mutex.RLock()
+ defer repo.mutex.RUnlock()
+
+ if out, ok := repo.topics[u.String()]; ok {
+ return &out, nil
+ }
+
+ return nil, topic.ErrNotExist
+}
+
+func (repo *memoryTopicRepository) Fetch(_ context.Context) ([]domain.Topic, error) {
+ repo.mutex.RLock()
+ defer repo.mutex.RUnlock()
+
+ out := make([]domain.Topic, 0)
+
+ for _, t := range repo.topics {
+ out = append(out, t)
+ }
+
+ return out, nil
+}
diff --git a/internal/topic/usecase.go b/internal/topic/usecase.go
new file mode 100644
index 0000000..54007b7
--- /dev/null
+++ b/internal/topic/usecase.go
@@ -0,0 +1,10 @@
+package topic
+
+import (
+ "context"
+ "net/url"
+)
+
+type UseCase interface {
+ Publish(ctx context.Context, u *url.URL) (bool, error)
+}
diff --git a/internal/topic/usecase/topic_ucase.go b/internal/topic/usecase/topic_ucase.go
new file mode 100644
index 0000000..d6edd00
--- /dev/null
+++ b/internal/topic/usecase/topic_ucase.go
@@ -0,0 +1,66 @@
+package usecase
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "time"
+
+ "source.toby3d.me/toby3d/hub/internal/common"
+ "source.toby3d.me/toby3d/hub/internal/domain"
+ "source.toby3d.me/toby3d/hub/internal/topic"
+)
+
+type topicUseCase struct {
+ client *http.Client
+ topics topic.Repository
+}
+
+func NewTopicUseCase(topics topic.Repository, client *http.Client) topic.UseCase {
+ return &topicUseCase{
+ topics: topics,
+ client: client,
+ }
+}
+
+func (ucase *topicUseCase) Publish(ctx context.Context, u *url.URL) (bool, error) {
+ now := time.Now().UTC().Round(time.Second)
+
+ resp, err := ucase.client.Get(u.String())
+ if err != nil {
+ return false, fmt.Errorf("cannot fetch publishing url: %w", err)
+ }
+
+ content, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return false, fmt.Errorf("cannot read topic response body: %w", err)
+ }
+
+ if err := ucase.topics.Update(ctx, u, func(tx *domain.Topic) (*domain.Topic, error) {
+ tx.Self = resp.Request.URL
+ tx.UpdatedAt = now
+ tx.Content = content
+ tx.ContentType = resp.Header.Get(common.HeaderContentType)
+
+ return tx, nil
+ }); err != nil {
+ if !errors.Is(err, topic.ErrNotExist) {
+ return false, fmt.Errorf("cannot publish exists topic: %w", err)
+ }
+
+ if err = ucase.topics.Create(ctx, resp.Request.URL, domain.Topic{
+ CreatedAt: now,
+ UpdatedAt: now,
+ Self: resp.Request.URL,
+ ContentType: resp.Header.Get(common.HeaderContentType),
+ Content: content,
+ }); err != nil {
+ return false, fmt.Errorf("cannot publish a new topic: %w", err)
+ }
+ }
+
+ return true, nil
+}
diff --git a/internal/topic/usecase/topic_ucase_test.go b/internal/topic/usecase/topic_ucase_test.go
new file mode 100644
index 0000000..ece96ef
--- /dev/null
+++ b/internal/topic/usecase/topic_ucase_test.go
@@ -0,0 +1,43 @@
+package usecase_test
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "testing"
+
+ "source.toby3d.me/toby3d/hub/internal/common"
+ "source.toby3d.me/toby3d/hub/internal/domain"
+ topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory"
+ "source.toby3d.me/toby3d/hub/internal/topic/usecase"
+)
+
+func TestTopicUseCase_Publish(t *testing.T) {
+ t.Parallel()
+
+ topic := domain.TestTopic(t)
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+ w.Header().Set(common.HeaderContentType, topic.ContentType)
+ fmt.Fprint(w, topic.Content)
+ }))
+ t.Cleanup(srv.Close)
+
+ topic.Self, _ = url.Parse(srv.URL + "/")
+
+ ok, err := usecase.NewTopicUseCase(topics, srv.Client()).
+ Publish(context.Background(), topic.Self)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !ok {
+ t.Errorf("want %t, got %t", true, ok)
+ }
+
+ if _, err := topics.Get(context.Background(), topic.Self); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/internal/urlutil/shift_path_test.go b/internal/urlutil/shift_path_test.go
new file mode 100644
index 0000000..f26bcfa
--- /dev/null
+++ b/internal/urlutil/shift_path_test.go
@@ -0,0 +1,37 @@
+package urlutil_test
+
+import (
+ "testing"
+
+ "source.toby3d.me/toby3d/hub/internal/urlutil"
+)
+
+func TestShiftPath(t *testing.T) {
+ t.Parallel()
+
+ for name, tc := range map[string]struct {
+ input string
+ expect [2]string
+ }{
+ "empty": {input: "", expect: [2]string{"", "/"}},
+ "root": {input: "/", expect: [2]string{"", "/"}},
+ "page": {input: "/foo", expect: [2]string{"foo", "/"}},
+ "folder": {input: "/foo/bar", expect: [2]string{"foo", "/bar"}},
+ } {
+ name, tc := name, tc
+
+ t.Run(name, func(t *testing.T) {
+ t.Parallel()
+
+ head, tail := urlutil.ShiftPath(tc.input)
+
+ if head != tc.expect[0] {
+ t.Errorf("want '%s', got '%s'", tc.expect[0], head)
+ }
+
+ if tail != tc.expect[1] {
+ t.Errorf("want '%s', got '%s'", tc.expect[1], tail)
+ }
+ })
+ }
+}
diff --git a/main.go b/main.go
index 7cd7894..c570ca5 100644
--- a/main.go
+++ b/main.go
@@ -5,6 +5,7 @@
package main
import (
+ "context"
"embed"
"io/fs"
"log"
@@ -24,6 +25,8 @@ import (
"source.toby3d.me/toby3d/hub/internal/middleware"
subscriptionmemoryrepo "source.toby3d.me/toby3d/hub/internal/subscription/repository/memory"
subscriptionucase "source.toby3d.me/toby3d/hub/internal/subscription/usecase"
+ topicmemoryrepo "source.toby3d.me/toby3d/hub/internal/topic/repository/memory"
+ topicucase "source.toby3d.me/toby3d/hub/internal/topic/usecase"
"source.toby3d.me/toby3d/hub/internal/urlutil"
)
@@ -50,6 +53,8 @@ func init() {
}
func main() {
+ ctx := context.Background()
+
config := new(domain.Config)
if err := env.Parse(config, env.Options{Prefix: "HUB_"}); err != nil {
logger.Fatalln(err)
@@ -60,33 +65,41 @@ func main() {
logger.Fatalln(err)
}
- client := &http.Client{Timeout: 1 * time.Second}
+ client := &http.Client{Timeout: 5 * time.Second}
matcher := language.NewMatcher(message.DefaultCatalog.Languages())
subscriptions := subscriptionmemoryrepo.NewMemorySubscriptionRepository()
- hub := hubhttprelivery.NewHandler(hubhttprelivery.NewHandlerParams{
- Hub: hubucase.NewHubUseCase(subscriptions, client, config.BaseURL),
- Subscriptions: subscriptionucase.NewSubscriptionUseCase(subscriptions),
+ topics := topicmemoryrepo.NewMemoryTopicRepository()
+ topicService := topicucase.NewTopicUseCase(topics, client)
+ subscriptionService := subscriptionucase.NewSubscriptionUseCase(subscriptions, topics, client)
+ hubService := hubucase.NewHubUseCase(topics, subscriptions, client, config.BaseURL)
+
+ handler := hubhttprelivery.NewHandler(hubhttprelivery.NewHandlerParams{
+ Hub: hubService,
+ Subscriptions: subscriptionService,
+ Topics: topicService,
Matcher: matcher,
Name: config.Name,
})
server := &http.Server{
- Addr: ":3000",
+ Addr: config.Bind,
Handler: http.HandlerFunc(middleware.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
head, _ := urlutil.ShiftPath(r.URL.Path)
switch head {
case "":
- hub.ServeHTTP(w, r)
+ handler.ServeHTTP(w, r)
case "static":
http.FileServer(http.FS(static)).ServeHTTP(w, r)
}
}).Intercept(middleware.LogFmt())),
- ReadTimeout: 1 * time.Second,
- WriteTimeout: 1 * time.Second,
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 5 * time.Second,
ErrorLog: logger,
}
+ go hubService.ListenAndServe(ctx)
+
if err = server.ListenAndServe(); err != nil {
logger.Fatalln(err)
}
diff --git a/web/static/manifest.webmanifest b/web/static/manifest.webmanifest
index c1736c8..8a51461 100644
--- a/web/static/manifest.webmanifest
+++ b/web/static/manifest.webmanifest
@@ -1,24 +1,16 @@
{
- "lang": "en",
- "dir": "ltr",
- "name": "WebSub",
- "description": "A dead simple WebSub hub",
- "short_name": "WebSub",
- "icons": [{
- "purpose": "maskable",
- "sizes": "192x192",
- "src": "icon-192x192.png",
- "type": "image/png"
- },{
- "purpose": "maskable",
- "sizes": "512x512",
- "src": "icon-512x512.png",
- "type": "image/png"
- }],
- "scope": "/",
- "start_url": "/",
- "display": "fullscreen",
- "orientation": "landscape",
- "theme_color": "#ff6fcf",
- "background_color": "#ff6fcf"
-}
\ No newline at end of file
+ "icons": [
+ {
+ "purpose": "maskable",
+ "sizes": "192x192",
+ "src": "icon-192x192.png",
+ "type": "image/png"
+ },
+ {
+ "purpose": "maskable",
+ "sizes": "512x512",
+ "src": "icon-512x512.png",
+ "type": "image/png"
+ }
+ ]
+}
diff --git a/web/template/baseof.qtpl b/web/template/baseof.qtpl
index dcf6b99..c388f1e 100644
--- a/web/template/baseof.qtpl
+++ b/web/template/baseof.qtpl
@@ -38,6 +38,9 @@ func NewBaseOf(lang language.Tag, name string) *BaseOf {
{% func (p *BaseOf) head() %}
{% comment %}https://evilmartians.com/chronicles/how-to-favicon-in-2021-six-files-that-fit-most-needs{% endcomment %}
+
+
@@ -48,9 +51,6 @@ func NewBaseOf(lang language.Tag, name string) *BaseOf {
-
-
{% endfunc %}
{% func (p *BaseOf) body() %}{% endfunc %}