refactor: ClickTracker service
This commit is contained in:
+13
-31
@@ -1,17 +1,7 @@
|
||||
require "uuid"
|
||||
require "user_agent_parser"
|
||||
require "async"
|
||||
|
||||
require "../lib/controller.cr"
|
||||
require "../lib/ip_lookup"
|
||||
|
||||
UserAgent.load_regexes(File.read("data/uap_core_regexes.yaml"))
|
||||
IpLookup.load_mmdb("data/GeoLite2-Country.mmdb")
|
||||
|
||||
Async.configure do |config|
|
||||
config.max_threads = 20
|
||||
config.queue_size = 1000 # Set max queue size for pending operations
|
||||
end
|
||||
|
||||
module App::Controllers::Link
|
||||
class Create < App::Lib::BaseController
|
||||
@@ -52,6 +42,9 @@ module App::Controllers::Link
|
||||
class Index < App::Lib::BaseController
|
||||
include App::Models
|
||||
include App::Lib
|
||||
include App::Services
|
||||
|
||||
ClickTracker.init
|
||||
|
||||
def call(env)
|
||||
slug = env.params.url["slug"]
|
||||
@@ -67,7 +60,6 @@ module App::Controllers::Link
|
||||
end
|
||||
raise App::NotFoundException.new(env) if !link
|
||||
|
||||
link_id = link.not_nil![:id]
|
||||
remote_address = env.request.headers["Cf-Connecting-Ip"]?.try(&.presence) || env.request.remote_address.try &.to_s
|
||||
user_agent_str = env.request.headers["User-Agent"]? || "Unknown"
|
||||
|
||||
@@ -79,29 +71,19 @@ module App::Controllers::Link
|
||||
env.response.headers["X-Forwarded-For"] = client_ip
|
||||
env.response.headers["User-Agent"] = user_agent_str
|
||||
|
||||
# The Future.execute is auto-queued
|
||||
Async::Future.execute do
|
||||
ip_lookup = client_ip != "Unknown" ? IpLookup.new(client_ip) : nil
|
||||
country = ip_lookup.try &.country.try &.code
|
||||
|
||||
user_agent = user_agent_str != "Unknown" ? UserAgent.new(user_agent_str) : nil
|
||||
spawn do
|
||||
link_id = link.not_nil![:id]
|
||||
|
||||
source = env.params.query["utm_source"]? || "Direct"
|
||||
referer_host = env.request.headers["Referer"]?.try { |r| begin URI.parse(r).host rescue r end } || source
|
||||
referer = env.request.headers["Referer"]?.try { |r| begin URI.parse(r).host rescue r end } || source
|
||||
|
||||
click = Click.new
|
||||
click.id = UUID.v4.to_s
|
||||
click.link_id = link_id
|
||||
click.country = country
|
||||
click.user_agent = user_agent_str
|
||||
click.browser = user_agent.try &.family
|
||||
click.os = user_agent.try &.os.try &.family
|
||||
click.referer = referer_host
|
||||
|
||||
changeset = Database.insert(click)
|
||||
if changeset.errors.any?
|
||||
Log.error { "Logging click event failed: #{changeset.errors}" }
|
||||
end
|
||||
ClickTracker.track(
|
||||
link_id: link.not_nil![:id],
|
||||
client_ip: client_ip,
|
||||
user_agent: user_agent_str,
|
||||
source: source,
|
||||
referer: referer
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
require "../lib/*"
|
||||
require "../models/*"
|
||||
|
||||
UserAgent.load_regexes(File.read("data/uap_core_regexes.yaml"))
|
||||
IpLookup.load_mmdb("data/GeoLite2-Country.mmdb")
|
||||
|
||||
module App::Services
|
||||
class ClickTracker
|
||||
@@queue = Channel(Tuple(String, String, String, String, String)).new(1000)
|
||||
@@initialized = false
|
||||
|
||||
def self.init
|
||||
return if @@initialized
|
||||
@@initialized = true
|
||||
|
||||
# Just use a single worker fiber to process the queue
|
||||
spawn do
|
||||
Log.info { "ClickTracker worker started" }
|
||||
loop do
|
||||
begin
|
||||
link_id, client_ip, user_agent_str, source, referer = @@queue.receive
|
||||
|
||||
ip_lookup = client_ip != "Unknown" ? IpLookup.new(client_ip) : nil
|
||||
country = ip_lookup.try &.country.try &.code
|
||||
|
||||
user_agent = user_agent_str != "Unknown" ? UserAgent.new(user_agent_str) : nil
|
||||
|
||||
click = App::Models::Click.new
|
||||
click.id = UUID.v4.to_s
|
||||
click.link_id = link_id
|
||||
click.country = country
|
||||
click.user_agent = user_agent_str
|
||||
click.browser = user_agent.try &.family
|
||||
click.os = user_agent.try &.os.try &.family
|
||||
click.referer = referer
|
||||
|
||||
changeset = App::Lib::Database.insert(click)
|
||||
if changeset.errors.any?
|
||||
Log.error { "Logging click event failed: #{changeset.errors}" }
|
||||
end
|
||||
rescue ex
|
||||
Log.error { "Error processing click: #{ex.message}" }
|
||||
sleep 0.1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.track(link_id : String, client_ip : String, user_agent : String, source : String, referer : String)
|
||||
init if !@@initialized
|
||||
|
||||
@@queue.send({link_id, client_ip, user_agent, source, referer})
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user