问题描述
我有一个类,我在 updateLiveSockets()
方法内每 30 秒从单个后台线程填充地图 liveSocketsByDatacenter
,然后我有一个方法 getNextSocket()
将被多个读取器线程调用以获取可用的活动套接字,该套接字使用相同的映射来获取此信息.
I have a class in which I am populating a map liveSocketsByDatacenter
from a single background thread every 30 seconds inside updateLiveSockets()
method and then I have a method getNextSocket()
which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}
正如你在我的课堂上看到的那样:
As you can see in my class:
- 从每 30 秒运行一次的单个后台线程,我用
updateLiveSockets()
方法中的所有活动套接字填充liveSocketsByDatacenter
映射. - 然后从多个线程中,我调用
getNextSocket()
方法给我一个可用的活动套接字,它使用liveSocketsByDatacenter
映射来获取所需的信息.李>
- From a single background thread which runs every 30 seconds, I populate
liveSocketsByDatacenter
map with all the live sockets inupdateLiveSockets()
method. - And then from multiple threads, I call the
getNextSocket()
method to give me a live socket available which uses aliveSocketsByDatacenter
map to get the required information.
我的代码运行良好,没有任何问题,我想看看是否有更好或更有效的方法来编写它.我还想就线程安全问题或任何竞争条件(如果有的话)发表意见,但到目前为止我还没有看到任何问题,但我可能是错的.
I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.
我最担心的是 updateLiveSockets()
方法和 getLiveSocketX()
方法.我在 LINE A 迭代 liveSockets
这是 SocketHolder
的 List
然后创建一个新的 SocketHolder
对象并添加到另一个新列表.这里可以吗?
I am mostly worried about updateLiveSockets()
method and getLiveSocketX()
method. I am iterating liveSockets
which is a List
of SocketHolder
at LINE A and then making a new SocketHolder
object and adding to another new list. Is this ok here?
注意: SocketHolder
是一个不可变的类.你可以忽略我拥有的 ZeroMQ
东西.
Note: SocketHolder
is an immutable class. And you can ignore ZeroMQ
stuff I have.
推荐答案
您使用以下同步技术.
- 带有实时套接字数据的地图位于原子引用后面,这允许安全地切换地图.
updateLiveSockets()
方法是同步的(隐含在此),这将防止两个线程同时切换地图.- 如果在
getNextSocket()
方法期间发生切换,请在使用地图时对地图进行本地引用以避免混淆.
- The map with live socket data is behind an atomic reference, this allows safely switching the map.
- The
updateLiveSockets()
method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously. - You make a local reference to the map when using it to avoid mixups if the switch happens during the
getNextSocket()
method.
它像现在一样是线程安全的吗?
线程安全始终取决于共享可变数据是否正确同步.在这种情况下,共享的可变数据是数据中心到其 SocketHolders 列表的映射.
Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.
地图位于AtomicReference
中,并且制作本地副本以供使用这一事实足以在地图上进行同步.您的方法采用地图的一个版本并使用它,由于 AtomicReference
的性质,切换版本是线程安全的.这也可以通过将成员字段设置为地图 volatile
来实现,因为您所做的只是更新引用(您无需对其执行任何检查然后执行操作).
The fact that the map is in an AtomicReference
, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference
. This could also have been achieved with just making the member field for the map volatile
, as all you do is update the reference (you don't do any check-then-act operations on it).
由于 scheduleAtFixedRate()
保证传递的 Runnable
不会与自身并发运行,所以 updateLiveSockets() 上的
synchronized
不是必需的,但是,它也不会造成任何真正的伤害.
As scheduleAtFixedRate()
guarantees that the passed Runnable
will not be run concurrently with itself, the synchronized
on updateLiveSockets()
is not needed, however, it also doesn't do any real harm.
所以是的,这个类是线程安全的,因为它是.
So yes, this class is thread safe, as it is.
但是,SocketHolder
是否可以被多个线程同时使用并不完全清楚.事实上,这个类只是试图通过选择一个随机的活动来最小化 SocketHolder
的并发使用(尽管不需要打乱整个数组来选择一个随机索引).它实际上并没有阻止并发使用.
However, it's not entirely clear if a SocketHolder
can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolder
s by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.
可以提高效率吗?
我相信它可以.查看 updateLiveSockets()
方法时,它似乎构建了完全相同的映射,除了 SocketHolder
可能具有不同的 isLive
标志.这使我得出结论,与其切换整个地图,我只想切换地图中的每个列表.为了以线程安全的方式更改映射中的条目,我可以使用 ConcurrentHashMap
.
I believe it can. When looking at the updateLiveSockets()
method, it seems it builds the exact same map, except that the SocketHolder
s may have different values for the isLive
flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap
.
如果我使用 ConcurrentHashMap
,并且不切换映射,而是切换映射中的值,我可以摆脱 AtomicReference
.
If I use a ConcurrentHashMap
, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference
.
要更改映射,我可以构建新列表并将其直接放入地图中.这样更高效,因为我发布数据更快,创建的对象更少,而我的同步只是建立在现成的组件上,这有利于可读性.
To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.
这是我的构建(为了简洁,省略了一些不太相关的部分)
Here's my build (omitted some parts that were less relevant, for brevity)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}
这篇关于在单个后台线程定期修改它的同时读取 Map的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!