Use to_json call for raw event strings (#38215)
This commit is contained in:
@@ -24,6 +24,6 @@ module AccessTokenExtension
|
|||||||
end
|
end
|
||||||
|
|
||||||
def push_to_streaming_api
|
def push_to_streaming_api
|
||||||
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed?
|
redis.publish("timeline:access_token:#{id}", { event: :kill }.to_json) if revoked? || destroyed?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ module ApplicationExtension
|
|||||||
|
|
||||||
def close_streaming_sessions(resource_owner = nil)
|
def close_streaming_sessions(resource_owner = nil)
|
||||||
# TODO: #28793 Combine into a single topic
|
# TODO: #28793 Combine into a single topic
|
||||||
payload = Oj.dump(event: :kill)
|
payload = { event: :kill }.to_json
|
||||||
scope = access_tokens
|
scope = access_tokens
|
||||||
scope = scope.where(resource_owner_id: resource_owner.id) unless resource_owner.nil?
|
scope = scope.where(resource_owner_id: resource_owner.id) unless resource_owner.nil?
|
||||||
scope.in_batches do |tokens|
|
scope.in_batches do |tokens|
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ class FeedManager
|
|||||||
def unpush_from_home(account, status, update: false)
|
def unpush_from_home(account, status, update: false)
|
||||||
return false unless remove_from_feed(:home, account.id, status, aggregate_reblogs: account.user&.aggregates_reblogs?)
|
return false unless remove_from_feed(:home, account.id, status, aggregate_reblogs: account.user&.aggregates_reblogs?)
|
||||||
|
|
||||||
redis.publish("timeline:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) unless update
|
redis.publish("timeline:#{account.id}", { event: :delete, payload: status.id.to_s }.to_json) unless update
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -117,7 +117,7 @@ class FeedManager
|
|||||||
def unpush_from_list(list, status, update: false)
|
def unpush_from_list(list, status, update: false)
|
||||||
return false unless remove_from_feed(:list, list.id, status, aggregate_reblogs: list.account.user&.aggregates_reblogs?)
|
return false unless remove_from_feed(:list, list.id, status, aggregate_reblogs: list.account.user&.aggregates_reblogs?)
|
||||||
|
|
||||||
redis.publish("timeline:list:#{list.id}", Oj.dump(event: :delete, payload: status.id.to_s)) unless update
|
redis.publish("timeline:list:#{list.id}", { event: :delete, payload: status.id.to_s }.to_json) unless update
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ module Account::Suspensions
|
|||||||
|
|
||||||
# This terminates all connections for the given account with the streaming
|
# This terminates all connections for the given account with the streaming
|
||||||
# server:
|
# server:
|
||||||
redis.publish("timeline:system:#{id}", Oj.dump(event: :kill)) if local?
|
redis.publish("timeline:system:#{id}", { event: :kill }.to_json) if local?
|
||||||
end
|
end
|
||||||
|
|
||||||
def unsuspend!
|
def unsuspend!
|
||||||
|
|||||||
@@ -115,8 +115,8 @@ class CustomFilter < ApplicationRecord
|
|||||||
@should_invalidate_cache = false
|
@should_invalidate_cache = false
|
||||||
|
|
||||||
Rails.cache.delete("filters:v3:#{account_id}")
|
Rails.cache.delete("filters:v3:#{account_id}")
|
||||||
redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
|
redis.publish("timeline:#{account_id}", { event: :filters_changed }.to_json)
|
||||||
redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed))
|
redis.publish("timeline:system:#{account_id}", { event: :filters_changed }.to_json)
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ class User < ApplicationRecord
|
|||||||
|
|
||||||
# This terminates all connections for the given account with the streaming
|
# This terminates all connections for the given account with the streaming
|
||||||
# server:
|
# server:
|
||||||
redis.publish("timeline:system:#{account.id}", Oj.dump(event: :kill))
|
redis.publish("timeline:system:#{account.id}", { event: :kill }.to_json)
|
||||||
end
|
end
|
||||||
|
|
||||||
def enable!
|
def enable!
|
||||||
@@ -347,7 +347,7 @@ class User < ApplicationRecord
|
|||||||
# Revoke each access token for the Streaming API, since `update_all``
|
# Revoke each access token for the Streaming API, since `update_all``
|
||||||
# doesn't trigger ActiveRecord Callbacks:
|
# doesn't trigger ActiveRecord Callbacks:
|
||||||
# TODO: #28793 Combine into a single topic
|
# TODO: #28793 Combine into a single topic
|
||||||
payload = Oj.dump(event: :kill)
|
payload = { event: :kill }.to_json
|
||||||
redis.pipelined do |pipeline|
|
redis.pipelined do |pipeline|
|
||||||
batch.ids.each do |id|
|
batch.ids.each do |id|
|
||||||
pipeline.publish("timeline:access_token:#{id}", payload)
|
pipeline.publish("timeline:access_token:#{id}", payload)
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ class BatchedRemoveStatusService < BaseService
|
|||||||
def unpush_from_public_timelines(status, pipeline)
|
def unpush_from_public_timelines(status, pipeline)
|
||||||
return unless status.public_visibility? && status.id > @status_id_cutoff
|
return unless status.public_visibility? && status.id > @status_id_cutoff
|
||||||
|
|
||||||
payload = Oj.dump(event: :delete, payload: status.id.to_s)
|
payload = { event: :delete, payload: status.id.to_s }.to_json
|
||||||
|
|
||||||
pipeline.publish('timeline:public', payload)
|
pipeline.publish('timeline:public', payload)
|
||||||
pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
|
pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
|
||||||
|
|||||||
@@ -259,7 +259,7 @@ class NotifyService < BaseService
|
|||||||
end
|
end
|
||||||
|
|
||||||
def push_to_streaming_api!
|
def push_to_streaming_api!
|
||||||
redis.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification)))
|
redis.publish("timeline:#{@recipient.id}:notifications", { event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification) }.to_json)
|
||||||
end
|
end
|
||||||
|
|
||||||
def subscribed_to_streaming_api?
|
def subscribed_to_streaming_api?
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ class RemoveStatusService < BaseService
|
|||||||
# @option [Boolean] :original_removed
|
# @option [Boolean] :original_removed
|
||||||
# @option [Boolean] :skip_streaming
|
# @option [Boolean] :skip_streaming
|
||||||
def call(status, **options)
|
def call(status, **options)
|
||||||
@payload = Oj.dump(event: :delete, payload: status.id.to_s)
|
@payload = { event: :delete, payload: status.id.to_s }.to_json
|
||||||
@status = status
|
@status = status
|
||||||
@account = status.account
|
@account = status.account
|
||||||
@options = options
|
@options = options
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ class PublishAnnouncementReactionWorker
|
|||||||
reaction ||= announcement.announcement_reactions.new(name: name)
|
reaction ||= announcement.announcement_reactions.new(name: name)
|
||||||
|
|
||||||
payload = InlineRenderer.render(reaction, nil, :reaction).tap { |h| h[:announcement_id] = announcement_id.to_s }
|
payload = InlineRenderer.render(reaction, nil, :reaction).tap { |h| h[:announcement_id] = announcement_id.to_s }
|
||||||
payload = Oj.dump(event: :'announcement.reaction', payload: payload)
|
payload = { event: :'announcement.reaction', payload: payload }
|
||||||
|
|
||||||
FeedManager.instance.with_active_accounts do |account|
|
FeedManager.instance.with_active_accounts do |account|
|
||||||
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ class PublishScheduledAnnouncementWorker
|
|||||||
@announcement.publish! unless @announcement.published?
|
@announcement.publish! unless @announcement.published?
|
||||||
|
|
||||||
payload = InlineRenderer.render(@announcement, nil, :announcement)
|
payload = InlineRenderer.render(@announcement, nil, :announcement)
|
||||||
payload = Oj.dump(event: :announcement, payload: payload)
|
payload = { event: :announcement, payload: payload }.to_json
|
||||||
|
|
||||||
FeedManager.instance.with_active_accounts do |account|
|
FeedManager.instance.with_active_accounts do |account|
|
||||||
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ class PushConversationWorker
|
|||||||
message = InlineRenderer.render(conversation, conversation.account, :conversation)
|
message = InlineRenderer.render(conversation, conversation.account, :conversation)
|
||||||
timeline_id = "timeline:direct:#{conversation.account_id}"
|
timeline_id = "timeline:direct:#{conversation.account_id}"
|
||||||
|
|
||||||
redis.publish(timeline_id, Oj.dump(event: :conversation, payload: message))
|
redis.publish(timeline_id, { event: :conversation, payload: message }.to_json)
|
||||||
rescue ActiveRecord::RecordNotFound
|
rescue ActiveRecord::RecordNotFound
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ class UnfilterNotificationsWorker
|
|||||||
end
|
end
|
||||||
|
|
||||||
def push_streaming_event!
|
def push_streaming_event!
|
||||||
redis.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notifications_merged, payload: '1'))
|
redis.publish("timeline:#{@recipient.id}:notifications", { event: :notifications_merged, payload: '1' }.to_json)
|
||||||
end
|
end
|
||||||
|
|
||||||
def subscribed_to_streaming_api?
|
def subscribed_to_streaming_api?
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ class UnpublishAnnouncementWorker
|
|||||||
include Redisable
|
include Redisable
|
||||||
|
|
||||||
def perform(announcement_id)
|
def perform(announcement_id)
|
||||||
payload = Oj.dump(event: :'announcement.delete', payload: announcement_id.to_s)
|
payload = { event: :'announcement.delete', payload: announcement_id.to_s }.to_json
|
||||||
|
|
||||||
FeedManager.instance.with_active_accounts do |account|
|
FeedManager.instance.with_active_accounts do |account|
|
||||||
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
redis.publish("timeline:#{account.id}", payload) if redis.exists?("subscribed:timeline:#{account.id}")
|
||||||
|
|||||||
@@ -546,7 +546,7 @@ RSpec.describe FeedManager do
|
|||||||
allow(redis).to receive_messages(publish: nil)
|
allow(redis).to receive_messages(publish: nil)
|
||||||
subject.unpush_from_home(receiver, status)
|
subject.unpush_from_home(receiver, status)
|
||||||
|
|
||||||
deletion = Oj.dump(event: :delete, payload: status.id.to_s)
|
deletion = { event: :delete, payload: status.id.to_s }.to_json
|
||||||
expect(redis).to have_received(:publish).with("timeline:#{receiver.id}", deletion)
|
expect(redis).to have_received(:publish).with("timeline:#{receiver.id}", deletion)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -403,7 +403,7 @@ RSpec.describe User do
|
|||||||
expect(user).to have_attributes(disabled: true)
|
expect(user).to have_attributes(disabled: true)
|
||||||
|
|
||||||
expect(redis)
|
expect(redis)
|
||||||
.to have_received(:publish).with("timeline:system:#{user.account.id}", Oj.dump(event: :kill)).once
|
.to have_received(:publish).with("timeline:system:#{user.account.id}", { event: :kill }.to_json).once
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -445,7 +445,7 @@ RSpec.describe User do
|
|||||||
expect { web_push_subscription.reload }
|
expect { web_push_subscription.reload }
|
||||||
.to raise_error(ActiveRecord::RecordNotFound)
|
.to raise_error(ActiveRecord::RecordNotFound)
|
||||||
expect(redis_pipeline_stub)
|
expect(redis_pipeline_stub)
|
||||||
.to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once
|
.to have_received(:publish).with("timeline:access_token:#{access_token.id}", { event: :kill }.to_json).once
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_activated_sessions
|
def remove_activated_sessions
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ RSpec.describe 'Filters' do
|
|||||||
|
|
||||||
expect(keyword.reload.keyword).to eq 'updated'
|
expect(keyword.reload.keyword).to eq 'updated'
|
||||||
|
|
||||||
expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once
|
expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", { event: :filters_changed }.to_json).once
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ RSpec.describe RemoveStatusService, :inline_jobs do
|
|||||||
.to_not include(status.id)
|
.to_not include(status.id)
|
||||||
|
|
||||||
expect(redis)
|
expect(redis)
|
||||||
.to have_received(:publish).with('timeline:public:media', Oj.dump(event: :delete, payload: status.id.to_s))
|
.to have_received(:publish).with('timeline:public:media', { event: :delete, payload: status.id.to_s }.to_json)
|
||||||
|
|
||||||
expect(delete_delivery(hank, status))
|
expect(delete_delivery(hank, status))
|
||||||
.to have_been_made.once
|
.to have_been_made.once
|
||||||
|
|||||||
Reference in New Issue
Block a user